You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:18 UTC
[18/22] flink git commit: [FLINK-6731] [tests] Activate strict
checkstyle for flink-tests
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
deleted file mode 100644
index b952bc2..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.exampleScalaPrograms;
-
-
-import org.apache.flink.examples.scala.relational.WebLogAnalysis;
-import org.apache.flink.test.testdata.WebLogAnalysisData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class WebLogAnalysisITCase extends JavaProgramTestBase {
-
- private String docsPath;
- private String ranksPath;
- private String visitsPath;
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
- ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
- visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath);
- }
- @Override
- protected void testProgram() throws Exception {
- WebLogAnalysis.main(new String[]{
- "--documents", docsPath,
- "--ranks", ranksPath,
- "--visits", visitsPath,
- "--output", resultPath});
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
deleted file mode 100644
index ce55910..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.exampleScalaPrograms;
-
-import org.apache.flink.examples.scala.wordcount.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class WordCountITCase extends JavaProgramTestBase {
-
- protected String textPath;
- protected String resultPath;
-
- public WordCountITCase(){
- setParallelism(4);
- setNumTaskManagers(2);
- setTaskManagerNumSlots(2);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("text.txt", WordCountData.TEXT);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- WordCount.main(new String[] {
- "--input", textPath,
- "--output", resultPath });
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
deleted file mode 100644
index 10e3cb6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.failingPrograms;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.JobSubmissionException;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.NoOpInvokable;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@RunWith(Parameterized.class)
-public class JobSubmissionFailsITCase extends TestLogger {
-
- private static final int NUM_SLOTS = 20;
-
- private static LocalFlinkMiniCluster cluster;
- private static JobGraph workingJobGraph;
-
- @BeforeClass
- public static void setup() {
- try {
- Configuration config = new Configuration();
- config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
-
- cluster = new LocalFlinkMiniCluster(config);
-
- cluster.start();
-
- final JobVertex jobVertex = new JobVertex("Working job vertex.");
- jobVertex.setInvokableClass(NoOpInvokable.class);
- workingJobGraph = new JobGraph("Working testing job", jobVertex);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @AfterClass
- public static void teardown() {
- try {
- cluster.shutdown();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- private boolean detached;
-
- public JobSubmissionFailsITCase(boolean detached) {
- this.detached = detached;
- }
-
- @Parameterized.Parameters(name = "Detached mode = {0}")
- public static Collection<Boolean[]> executionModes(){
- return Arrays.asList(new Boolean[]{false},
- new Boolean[]{true});
- }
-
- // --------------------------------------------------------------------------------------------
-
- private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
- if (detached) {
- cluster.submitJobDetached(jobGraph);
- return null;
- }
- else {
- return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
- }
- }
-
- @Test
- public void testExceptionInInitializeOnMaster() {
- try {
- final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
- failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
- final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
-
- try {
- submitJob(failingJobGraph);
- fail("Expected JobExecutionException.");
- }
- catch (JobExecutionException e) {
- assertEquals("Test exception.", e.getCause().getMessage());
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Caught wrong exception of type " + t.getClass() + ".");
- }
-
- cluster.submitJobAndWait(workingJobGraph, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSubmitEmptyJobGraph() {
- try {
- final JobGraph jobGraph = new JobGraph("Testing job");
-
- try {
- submitJob(jobGraph);
- fail("Expected JobSubmissionException.");
- }
- catch (JobSubmissionException e) {
- assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Caught wrong exception of type " + t.getClass() + ".");
- }
-
- cluster.submitJobAndWait(workingJobGraph, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSubmitNullJobGraph() {
- try {
- try {
- submitJob(null);
- fail("Expected JobSubmissionException.");
- }
- catch (NullPointerException e) {
- // yo!
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Caught wrong exception of type " + t.getClass() + ".");
- }
-
- cluster.submitJobAndWait(workingJobGraph, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- public static class FailingJobVertex extends JobVertex {
- private static final long serialVersionUID = -6365291240199412135L;
-
- public FailingJobVertex(final String msg) {
- super(msg);
- }
-
- @Override
- public void initializeOnMaster(ClassLoader loader) throws Exception {
- throw new Exception("Test exception.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
deleted file mode 100644
index f6fb0dd..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.failingPrograms;
-
-import java.util.List;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Assert;
-
-/**
- *
- * Tests that both jobs, the failing and the working one, are handled correctly. The first (failing) job must be
- * canceled and the client must report the failure. The second (working) job must finish successfully and compute the
- * correct result.
- *
- */
-public class TaskFailureITCase extends JavaProgramTestBase {
-
- private static String EXCEPTION_STRING = "This is an expected Test Exception";
-
- @Override
- protected void testProgram() throws Exception {
- //test failing version
- try {
- executeTask(new FailingTestMapper(), 1);
- } catch (RuntimeException e) { //expected for collection execution
- if (!isCollectionExecution()) {
- Assert.fail();
- }
- // for collection execution, no restarts. So, exception should be appended with 0.
- Assert.assertEquals(EXCEPTION_STRING + ":0", e.getMessage());
- } catch (JobExecutionException e) { //expected for cluster execution
- if (isCollectionExecution()) {
- Assert.fail();
- }
- // for cluster execution, one restart. So, exception should be appended with 1.
- Assert.assertEquals(EXCEPTION_STRING + ":1", e.getCause().getMessage());
- }
- //test correct version
- executeTask(new TestMapper(), 0);
- }
-
-
- private void executeTask(MapFunction<Long, Long> mapper, int retries) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retries, 0));
- List<Long> result = env.generateSequence(1, 9)
- .map(mapper)
- .collect();
- MultipleProgramsTestBase.compareResultAsText(result, "1\n2\n3\n4\n5\n6\n7\n8\n9");
- }
-
-
- /**
- * working map function
- */
- public static class TestMapper implements MapFunction<Long, Long> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long map(Long value) throws Exception {
- return value;
- }
- }
-
- /**
- * failing map function
- */
- public static class FailingTestMapper extends RichMapFunction<Long, Long> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long map(Long value) throws Exception {
- throw new RuntimeException(EXCEPTION_STRING + ":" + getRuntimeContext().getAttemptNumber());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
index 468b780..07b4d76 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
@@ -21,12 +21,13 @@ package org.apache.flink.test.hadoop.mapred;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.OperatingSystem;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
@@ -49,10 +50,13 @@ import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
+/**
+ * Integraiton tests for Hadoop IO formats.
+ */
@RunWith(Parameterized.class)
public class HadoopIOFormatsITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 2;
+ private static final int NUM_PROGRAMS = 2;
private int curProgId = config.getInteger("ProgramId", -1);
private String[] resultPath;
@@ -61,9 +65,9 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
private String sequenceFileInPathNull;
public HadoopIOFormatsITCase(Configuration config) {
- super(config);
+ super(config);
}
-
+
@Before
public void checkOperatingSystem() {
// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
@@ -89,25 +93,24 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
Text value = new Text();
SequenceFile.Writer writer = null;
try {
- writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
- for (int i = 0; i < kvCount; i ++) {
- if(i == 1) {
+ writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
+ for (int i = 0; i < kvCount; i++) {
+ if (i == 1) {
// write key = 0 a bit more often.
- for(int a = 0;a < 15; a++) {
+ for (int a = 0; a < 15; a++) {
key.set(i);
- value.set(i+" - somestring");
+ value.set(i + " - somestring");
writer.append(key, value);
}
}
key.set(i);
- value.set(i+" - somestring");
+ value.set(i + " - somestring");
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
-
// ------------------ Long / Text Key Value pair: ------------
File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
@@ -117,8 +120,8 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
LongWritable value1 = new LongWritable();
SequenceFile.Writer writer1 = null;
try {
- writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass());
- for (int i = 0; i < kvCount; i ++) {
+ writer1 = SequenceFile.createWriter(fs, conf, path, NullWritable.class, value1.getClass());
+ for (int i = 0; i < kvCount; i++) {
value1.set(i);
writer1.append(NullWritable.get(), value1);
}
@@ -131,32 +134,32 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
protected void testProgram() throws Exception {
expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
}
-
+
@Override
protected void postSubmit() throws Exception {
- for(int i = 0; i < resultPath.length; i++) {
+ for (int i = 0; i < resultPath.length; i++) {
compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
}
}
-
+
@Parameters
public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
- for(int i=1; i <= NUM_PROGRAMS; i++) {
+ for (int i = 1; i <= NUM_PROGRAMS; i++) {
Configuration config = new Configuration();
config.setInteger("ProgramId", i);
tConfigs.add(config);
}
-
+
return TestBaseUtils.toParameterList(tConfigs);
}
-
- public static class HadoopIOFormatPrograms {
-
- public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
-
+
+ private static class HadoopIOFormatPrograms {
+
+ public static String[] runProgram(int progId, String[] resultPath, String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
+
switch(progId) {
case 1: {
/**
@@ -184,7 +187,7 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
});
res.writeAsText(resultPath[1]);
env.execute();
-
+
// return expected result
return new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" +
"1 - somestring - 1\n" +
@@ -224,8 +227,8 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
default:
throw new IllegalArgumentException("Invalid program id");
}
-
+
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
index 7c1b30e..21f6985 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
-import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.test.testfunctions.Tokenizer;
+import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.OperatingSystem;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -37,6 +37,11 @@ import org.apache.hadoop.mapred.TextOutputFormat;
import org.junit.Assume;
import org.junit.Before;
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
+
+/**
+ * Test WordCount with Hadoop input and output "mapred" (legacy) formats.
+ */
public class WordCountMapredITCase extends JavaProgramTestBase {
protected String textPath;
@@ -87,7 +92,6 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
}
});
-
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
@@ -97,7 +101,6 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
-
@Override
public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception {
return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1));
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
index fbc1994..0092fe7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
-import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.test.testfunctions.Tokenizer;
+import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.OperatingSystem;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -37,6 +37,11 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.junit.Assume;
import org.junit.Before;
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
+
+/**
+ * Test WordCount with Hadoop input and output "mapreduce" (modern) formats.
+ */
public class WordCountMapreduceITCase extends JavaProgramTestBase {
protected String textPath;
@@ -86,7 +91,6 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase {
}
});
-
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
@@ -96,7 +100,6 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase {
DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
-
@Override
public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception {
return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1));
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java
index e50dfd3..82f699d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java
@@ -18,8 +18,6 @@
package org.apache.flink.test.io;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple8;
@@ -32,6 +30,9 @@ import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -42,6 +43,9 @@ import java.io.File;
import java.util.List;
import java.util.Locale;
+/**
+ * Tests for {@link ExecutionEnvironment#readCsvFile}.
+ */
@RunWith(Parameterized.class)
public class CsvReaderITCase extends MultipleProgramsTestBase {
private String expected;
@@ -122,6 +126,9 @@ public class CsvReaderITCase extends MultipleProgramsTestBase {
compareResultAsTuples(result, expected);
}
+ /**
+ * POJO.
+ */
public static class POJOItem {
public String f1;
private int f2;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
index cfdc31a..90611eb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
@@ -38,7 +38,7 @@ public class InputOutputITCase extends JavaProgramTestBase {
env.createInput(new TestNonRichInputFormat()).output(output);
try {
env.execute();
- } catch(Exception e){
+ } catch (Exception e){
// we didn't break anything by making everything rich.
e.printStackTrace();
fail(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
index 98f19ff..4f25bad 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
@@ -79,9 +79,9 @@ public class RichInputOutputITCase extends JavaProgramTestBase {
@Override
public void open(FileInputSplit split) throws IOException{
- try{
+ try {
getRuntimeContext().addAccumulator("DATA_SOURCE_ACCUMULATOR", counter);
- } catch(UnsupportedOperationException e){
+ } catch (UnsupportedOperationException e){
// the accumulator is already added
}
super.open(split);
@@ -104,9 +104,9 @@ public class RichInputOutputITCase extends JavaProgramTestBase {
@Override
public void open(int a, int b){
- try{
+ try {
getRuntimeContext().addAccumulator("DATA_SINK_ACCUMULATOR", counter);
- } catch(UnsupportedOperationException e){
+ } catch (UnsupportedOperationException e){
// the accumulator is already added
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
index d55a63f..66adbde 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
@@ -18,50 +18,49 @@
package org.apache.flink.test.iterative;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-
+import java.util.ArrayList;
+import java.util.List;
+/**
+ * Integration test for a bulk iteration with an all reduce.
+ */
@SuppressWarnings("serial")
public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
-
+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
-
+
IterativeDataSet<Integer> iteration = data.iterate(10);
-
+
DataSet<Integer> result = data.reduceGroup(new PickOneAllReduce()).withBroadcastSet(iteration, "bc");
-
+
final List<Integer> resultList = new ArrayList<Integer>();
iteration.closeWith(result).output(new LocalCollectionOutputFormat<Integer>(resultList));
-
+
env.execute();
-
+
Assert.assertEquals(8, resultList.get(0).intValue());
}
-
- public static class PickOneAllReduce extends RichGroupReduceFunction<Integer, Integer> {
-
+ private static class PickOneAllReduce extends RichGroupReduceFunction<Integer, Integer> {
+
private Integer bcValue;
-
+
@Override
public void open(Configuration parameters) {
List<Integer> bc = getRuntimeContext().getBroadcastVariable("bc");
@@ -74,8 +73,8 @@ public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase {
return;
}
final int x = bcValue;
-
- for (Integer y : records) {
+
+ for (Integer y : records) {
if (y > x) {
out.collect(y);
return;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
index 2a0b004..2b53249 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
@@ -18,9 +18,6 @@
package org.apache.flink.test.iterative;
-import java.io.BufferedReader;
-import java.util.Iterator;
-
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -37,22 +34,26 @@ import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
+import java.io.BufferedReader;
+import java.util.Iterator;
+
+/**
+ * Delta iteration test implementing the connected components algorithm with a cogroup.
+ */
public class CoGroupConnectedComponentsITCase extends JavaProgramTestBase {
-
+
private static final long SEED = 0xBADC0FFEEBEEFL;
-
+
private static final int NUM_VERTICES = 1000;
-
+
private static final int NUM_EDGES = 10000;
-
private static final int MAX_ITERATIONS = 100;
protected String verticesPath;
protected String edgesPath;
protected String resultPath;
-
-
+
@Override
protected void preSubmit() throws Exception {
verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
@@ -66,11 +67,11 @@ public class CoGroupConnectedComponentsITCase extends JavaProgramTestBase {
ConnectedComponentsData.checkOddEvenResult(reader);
}
}
-
+
// --------------------------------------------------------------------------------------------
// The test program
// --------------------------------------------------------------------------------------------
-
+
@Override
protected void testProgram() throws Exception {
@@ -111,7 +112,7 @@ public class CoGroupConnectedComponentsITCase extends JavaProgramTestBase {
@ForwardedFieldsFirst("f1->f1")
@ForwardedFieldsSecond("f0->f0")
- public static final class MinIdAndUpdate implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ private static final class MinIdAndUpdate implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index 7f5d194..a25cf24 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -18,56 +18,58 @@
package org.apache.flink.test.iterative;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.ConnectedComponents.DuplicateValue;
+import org.apache.flink.examples.java.graph.ConnectedComponents.NeighborWithComponentIDJoin;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.examples.java.graph.ConnectedComponents.DuplicateValue;
-import org.apache.flink.examples.java.graph.ConnectedComponents.NeighborWithComponentIDJoin;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Delta iteration test implementing the connected components algorithm with a cogroup.
+ */
@SuppressWarnings("serial")
public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase {
-
+
private static final long SEED = 0xBADC0FFEEBEEFL;
-
+
private static final int NUM_VERTICES = 1000;
-
+
private static final int NUM_EDGES = 10000;
-
@Override
protected void testProgram() throws Exception {
-
+
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// read vertex and edge data
DataSet<Long> vertices = env.fromElements(ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES).split("\n"))
.map(new VertexParser());
-
+
DataSet<Tuple2<Long, Long>> edges = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"))
.flatMap(new EdgeParser());
-
+
// assign the initial components (equal to the vertex id)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
-
+
// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
-
+
// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
DataSet<Tuple2<Long, Long>> changes = iteration
.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
@@ -76,35 +78,34 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
-
-
+
// emit result
- List<Tuple2<Long,Long>> resutTuples = new ArrayList<Tuple2<Long,Long>>();
- result.output(new LocalCollectionOutputFormat<Tuple2<Long,Long>>(resutTuples));
-
+ List<Tuple2<Long, Long>> resutTuples = new ArrayList<>();
+ result.output(new LocalCollectionOutputFormat<>(resutTuples));
+
env.execute();
}
-
+
// --------------------------------------------------------------------------------------------
// The test program
// --------------------------------------------------------------------------------------------
-
- public static final class VertexParser extends RichMapFunction<String, Long> {
+
+ private static final class VertexParser extends RichMapFunction<String, Long> {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
}
-
- public static final class EdgeParser extends RichFlatMapFunction<String, Tuple2<Long, Long>> {
+
+ private static final class EdgeParser extends RichFlatMapFunction<String, Tuple2<Long, Long>> {
@Override
public void flatMap(String value, Collector<Tuple2<Long, Long>> out) throws Exception {
String[] parts = value.split(" ");
long v1 = Long.parseLong(parts[0]);
long v2 = Long.parseLong(parts[1]);
-
+
out.collect(new Tuple2<Long, Long>(v1, v2));
out.collect(new Tuple2<Long, Long>(v2, v1));
}
@@ -112,17 +113,17 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
@ForwardedFieldsFirst("0")
@ForwardedFieldsSecond("0")
- public static final class MinIdAndUpdate extends RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
+ private static final class MinIdAndUpdate extends RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> candidates, Iterable<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
Iterator<Tuple2<Long, Long>> iterator = current.iterator();
if (!iterator.hasNext()) {
throw new RuntimeException("Error: Id not encountered before.");
}
-
+
Tuple2<Long, Long> old = iterator.next();
-
+
long minimumComponentID = Long.MAX_VALUE;
for (Tuple2<Long, Long> candidate : candidates) {
@@ -131,7 +132,7 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
minimumComponentID = candidateComponentID;
}
}
-
+
if (minimumComponentID < old.f1) {
old.f1 = minimumComponentID;
out.collect(old);
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
index 61e08d4..ee5597b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
@@ -16,11 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
-import java.io.BufferedReader;
-
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -32,16 +29,19 @@ import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
+import java.io.BufferedReader;
+/**
+ * Delta iteration test implementing the connected components algorithm with a join.
+ */
public class ConnectedComponentsITCase extends JavaProgramTestBase {
-
+
private static final long SEED = 0xBADC0FFEEBEEFL;
-
+
private static final int NUM_VERTICES = 1000;
-
+
private static final int NUM_EDGES = 10000;
-
protected String verticesPath;
protected String edgesPath;
protected String resultPath;
@@ -93,6 +93,11 @@ public class ConnectedComponentsITCase extends JavaProgramTestBase {
}
}
+ /**
+ * Duplicate the vertex ID into both fields of a {@link Tuple2}.
+ *
+ * @param <T> key type
+ */
public static final class DuplicateValue<T> implements MapFunction<Tuple1<T>, Tuple2<T, T>> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
index bc4885f..e425f29 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
@@ -16,12 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
-import java.io.BufferedReader;
-import java.util.Collection;
-
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
@@ -35,25 +31,31 @@ import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
+
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import java.io.BufferedReader;
+import java.util.Collection;
+
+/**
+ * Delta iteration test implementing the connected components algorithm with a
+ * cogroup and join on the solution set.
+ */
@RunWith(Parameterized.class)
public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTestBase {
-
+
private static final long SEED = 0xBADC0FFEEBEEFL;
-
+
private static final int NUM_VERTICES = 1000;
-
+
private static final int NUM_EDGES = 10000;
-
protected String verticesPath;
protected String edgesPath;
protected String resultPath;
-
-
+
public ConnectedComponentsWithDeferredUpdateITCase(Configuration config) {
super(config);
}
@@ -92,8 +94,8 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTest
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new UpdateComponentIdMatchNonPreserving());
- DataSet<Tuple2<Long,Long>> delta;
- if(extraMapper) {
+ DataSet<Tuple2<Long, Long>> delta;
+ if (extraMapper) {
delta = changes.map(
// ID Mapper
new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@@ -127,14 +129,14 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTest
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
config1.setBoolean("ExtraMapper", false);
-
+
Configuration config2 = new Configuration();
config2.setBoolean("ExtraMapper", true);
-
+
return toParameterList(config1, config2);
}
- public static final class UpdateComponentIdMatchNonPreserving
+ private static final class UpdateComponentIdMatchNonPreserving
implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -144,7 +146,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTest
Tuple2<Long, Long> current,
Collector<Tuple2<Long, Long>> out) throws Exception {
- if(candidate.f1 < current.f1) {
+ if (candidate.f1 < current.f1) {
out.collect(candidate);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
index a8a28f1..ce88ab6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
@@ -18,9 +18,6 @@
package org.apache.flink.test.iterative;
-import java.io.BufferedReader;
-
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
@@ -33,22 +30,24 @@ import org.apache.flink.examples.java.graph.ConnectedComponents.UndirectEdge;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
+import java.io.BufferedReader;
+
+/**
+ * Delta iteration test implementing the connected components algorithm with an object map.
+ */
@SuppressWarnings("serial")
public class ConnectedComponentsWithObjectMapITCase extends JavaProgramTestBase {
-
+
private static final long SEED = 0xBADC0FFEEBEEFL;
-
+
private static final int NUM_VERTICES = 1000;
-
+
private static final int NUM_EDGES = 10000;
-
protected String verticesPath;
protected String edgesPath;
protected String resultPath;
-
-
@Override
protected void preSubmit() throws Exception {
verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
@@ -63,26 +62,25 @@ public class ConnectedComponentsWithObjectMapITCase extends JavaProgramTestBase
}
}
-
@Override
protected void testProgram() throws Exception {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// read vertex and edge data
DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);
-
+
DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class)
.flatMap(new UndirectEdge());
-
+
// assign the initial components (equal to the vertex id)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new ConnectedComponentsITCase.DuplicateValue<Long>());
-
+
// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
iteration.setSolutionSetUnManaged(true);
-
+
// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1)
@@ -91,11 +89,11 @@ public class ConnectedComponentsWithObjectMapITCase extends JavaProgramTestBase
// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
-
+
result.writeAsCsv(resultPath, "\n", " ");
-
+
// execute program
env.execute("Connected Components Example");
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
index c2dd434..6f35917 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
@@ -16,11 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
-import java.io.BufferedReader;
-
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -34,18 +31,19 @@ import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
+import java.io.BufferedReader;
+
/**
* Tests a bug that prevented that the solution set can be on both sides of the match/cogroup function.
*/
public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTestBase {
-
+
private static final long SEED = 0xBADC0FFEEBEEFL;
-
+
private static final int NUM_VERTICES = 1000;
-
+
private static final int NUM_EDGES = 10000;
-
protected String verticesPath;
protected String edgesPath;
protected String resultPath;
@@ -56,7 +54,7 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTe
edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
resultPath = getTempFilePath("results");
}
-
+
@Override
protected void testProgram() throws Exception {
// set up execution environment
@@ -92,7 +90,6 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTe
env.execute("Connected Components Example");
}
-
@Override
protected void postSubmit() throws Exception {
for (BufferedReader reader : getResultReader(resultPath)) {
@@ -105,7 +102,7 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTe
// --------------------------------------------------------------------------------------------
@FunctionAnnotation.ForwardedFieldsSecond("*")
- public static final class UpdateComponentIdMatchMirrored
+ private static final class UpdateComponentIdMatchMirrored
implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -115,7 +112,7 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTe
Tuple2<Long, Long> candidate,
Collector<Tuple2<Long, Long>> out) throws Exception {
- if(candidate.f1 < current.f1) {
+ if (candidate.f1 < current.f1) {
out.collect(candidate);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
index 53496e2..8f8f28b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
@@ -46,14 +46,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
+/**
+ * Implementation of PageRank accounting for "sink" vertices with 0 out-degree.
+ */
@RunWith(Parameterized.class)
@SuppressWarnings({"serial", "unchecked"})
public class DanglingPageRankITCase extends MultipleProgramsTestBase {
private static final String AGGREGATOR_NAME = "pagerank.aggregator";
-
-
+
public DanglingPageRankITCase(TestExecutionMode mode) {
super(mode);
}
@@ -61,9 +62,9 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
@Test
public void testDanglingPageRank() {
try {
- final int NUM_ITERATIONS = 25;
+ final int numIterations = 25;
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Tuple2<Long, Boolean>> vertices = env.fromElements(
new Tuple2<>(1L, false),
new Tuple2<>(2L, false),
@@ -78,8 +79,7 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
new PageWithLinks(4L, new long[] { 3, 2 }),
new PageWithLinks(1L, new long[] { 4, 2, 3 })
);
-
-
+
final long numVertices = vertices.count();
final long numDanglingVertices = vertices
.filter(
@@ -90,32 +90,31 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
}
})
.count();
-
-
+
DataSet<PageWithRankAndDangling> verticesWithInitialRank = vertices
.map(new MapFunction<Tuple2<Long, Boolean>, PageWithRankAndDangling>() {
-
+
@Override
public PageWithRankAndDangling map(Tuple2<Long, Boolean> value) {
return new PageWithRankAndDangling(value.f0, 1.0 / numVertices, value.f1);
}
});
-
- IterativeDataSet<PageWithRankAndDangling> iteration = verticesWithInitialRank.iterate(NUM_ITERATIONS);
+
+ IterativeDataSet<PageWithRankAndDangling> iteration = verticesWithInitialRank.iterate(numIterations);
iteration.getAggregators().registerAggregationConvergenceCriterion(
AGGREGATOR_NAME,
new PageRankStatsAggregator(),
new DiffL1NormConvergenceCriterion());
-
+
DataSet<PageWithRank> partialRanks = iteration.join(edges).where("pageId").equalTo("pageId").with(
new FlatJoinFunction<PageWithRankAndDangling, PageWithLinks, PageWithRank>() {
-
+
@Override
public void join(PageWithRankAndDangling page,
PageWithLinks links,
Collector<PageWithRank> out) {
-
+
double rankToDistribute = page.rank / (double) links.targets.length;
PageWithRank output = new PageWithRank(0L, rankToDistribute);
@@ -126,8 +125,8 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
}
}
);
-
- DataSet<PageWithRankAndDangling> newRanks =
+
+ DataSet<PageWithRankAndDangling> newRanks =
iteration.coGroup(partialRanks).where("pageId").equalTo("pageId").with(
new RichCoGroupFunction<PageWithRankAndDangling, PageWithRank, PageWithRankAndDangling>() {
@@ -136,15 +135,15 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
private final double randomJump = (1.0 - BETA) / numVertices;
private PageRankStatsAggregator aggregator;
private double danglingRankFactor;
-
+
@Override
public void open(Configuration parameters) throws Exception {
int currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-
+
aggregator = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
if (currentIteration == 1) {
- danglingRankFactor = BETA * (double) numDanglingVertices /
+ danglingRankFactor = BETA * (double) numDanglingVertices /
((double) numVertices * (double) numVertices);
} else {
PageRankStats previousAggregate = getIterationRuntimeContext()
@@ -152,12 +151,12 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
}
}
-
+
@Override
public void coGroup(Iterable<PageWithRankAndDangling> currentPages,
Iterable<PageWithRank> partialRanks,
Collector<PageWithRankAndDangling> out) {
-
+
// compute the next rank
long edges = 0;
double summedRank = 0;
@@ -166,7 +165,7 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
edges++;
}
double rank = BETA * summedRank + randomJump + danglingRankFactor;
-
+
// current rank, for stats and convergence
PageWithRankAndDangling currentPage = currentPages.iterator().next();
double currentRank = currentPage.rank;
@@ -182,16 +181,16 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
out.collect(currentPage);
}
});
-
+
List<PageWithRankAndDangling> result = iteration.closeWith(newRanks).collect();
-
+
double totalRank = 0.0;
for (PageWithRankAndDangling r : result) {
totalRank += r.rank;
assertTrue(r.pageId >= 1 && r.pageId <= 5);
assertTrue(r.pageId != 3 || r.dangling);
}
-
+
assertEquals(1.0, totalRank, 0.001);
}
catch (Exception e) {
@@ -203,25 +202,31 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
// ------------------------------------------------------------------------
// custom types
// ------------------------------------------------------------------------
-
+
+ /**
+ * POJO for page ID and rank value.
+ */
public static class PageWithRank {
-
+
public long pageId;
public double rank;
public PageWithRank() {}
-
+
public PageWithRank(long pageId, double rank) {
this.pageId = pageId;
this.rank = rank;
}
}
+ /**
+ * POJO for page ID, rank value, and whether a "dangling" vertex with 0 out-degree.
+ */
public static class PageWithRankAndDangling {
public long pageId;
public double rank;
- public boolean dangling;
+ public boolean dangling;
public PageWithRankAndDangling() {}
@@ -241,6 +246,9 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
}
}
+ /**
+ * POJO for page ID and list of target IDs.
+ */
public static class PageWithLinks {
public long pageId;
@@ -253,11 +261,14 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
this.targets = targets;
}
}
-
+
// ------------------------------------------------------------------------
// statistics
// ------------------------------------------------------------------------
+ /**
+ * PageRank statistics.
+ */
public static class PageRankStats implements Value {
private double diff;
@@ -272,7 +283,7 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
public PageRankStats(
double diff, double rank, double danglingRank,
long numDanglingVertices, long numVertices, long edges) {
-
+
this.diff = diff;
this.rank = rank;
this.danglingRank = danglingRank;
@@ -332,8 +343,8 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
"]";
}
}
-
- public static class PageRankStatsAggregator implements Aggregator<PageRankStats> {
+
+ private static class PageRankStatsAggregator implements Aggregator<PageRankStats> {
private double diff;
private double rank;
@@ -348,7 +359,7 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
}
public void aggregate(double diffDelta, double rankDelta, double danglingRankDelta, long danglingVerticesDelta,
- long verticesDelta, long edgesDelta) {
+ long verticesDelta, long edgesDelta) {
diff += diffDelta;
rank += rankDelta;
danglingRank += danglingRankDelta;
@@ -378,7 +389,7 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
}
}
- public static class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
+ private static class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
private static final double EPSILON = 0.00005;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
index caa9d37..84feacf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
@@ -18,11 +18,6 @@
package org.apache.flink.test.iterative;
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
@@ -32,9 +27,18 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test delta iterations that do not join with the solution set.
+ */
@SuppressWarnings("serial")
public class DeltaIterationNotDependingOnSolutionSetITCase extends JavaProgramTestBase {
- private final List<Tuple2<Long, Long>> result = new ArrayList<Tuple2<Long,Long>>();
+ private final List<Tuple2<Long, Long>> result = new ArrayList<>();
@Override
protected void testProgram() throws Exception {
@@ -47,7 +51,7 @@ public class DeltaIterationNotDependingOnSolutionSetITCase extends JavaProgramTe
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 5, 1);
iteration.closeWith(iteration.getWorkset(), iteration.getWorkset().map(new TestMapper()))
- .output(new LocalCollectionOutputFormat<Tuple2<Long,Long>>(result));
+ .output(new LocalCollectionOutputFormat<Tuple2<Long, Long>>(result));
env.execute();
}
@@ -75,11 +79,11 @@ public class DeltaIterationNotDependingOnSolutionSetITCase extends JavaProgramTe
return new Tuple2<T, T>(value, value);
}
}
-
+
private static final class TestMapper extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
- return new Tuple2<Long, Long>(value.f0+10, value.f1+10);
+ return new Tuple2<>(value.f0 + 10, value.f1 + 10);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index 0635fe5..74e3da2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -18,30 +18,28 @@
package org.apache.flink.test.iterative;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.List;
/**
- *
* Iterative Connected Components test case which recomputes only the elements
* of the solution set whose at least one dependency (in-neighbor) has changed since the last iteration.
* Requires two joins with the solution set.
- *
*/
@SuppressWarnings("serial")
public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
-
+
private static final int MAX_ITERATIONS = 20;
private static final int parallelism = 1;
@@ -53,46 +51,45 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
public DependencyConnectedComponentsITCase(){
setTaskManagerNumSlots(parallelism);
}
-
-
+
@Override
protected void preSubmit() throws Exception {
verticesInput.clear();
edgesInput.clear();
// vertices input
- verticesInput.add(new Tuple2<Long, Long>(1l,1l));
- verticesInput.add(new Tuple2<Long, Long>(2l,2l));
- verticesInput.add(new Tuple2<Long, Long>(3l,3l));
- verticesInput.add(new Tuple2<Long, Long>(4l,4l));
- verticesInput.add(new Tuple2<Long, Long>(5l,5l));
- verticesInput.add(new Tuple2<Long, Long>(6l,6l));
- verticesInput.add(new Tuple2<Long, Long>(7l,7l));
- verticesInput.add(new Tuple2<Long, Long>(8l,8l));
- verticesInput.add(new Tuple2<Long, Long>(9l,9l));
-
+ verticesInput.add(new Tuple2<>(1L, 1L));
+ verticesInput.add(new Tuple2<>(2L, 2L));
+ verticesInput.add(new Tuple2<>(3L, 3L));
+ verticesInput.add(new Tuple2<>(4L, 4L));
+ verticesInput.add(new Tuple2<>(5L, 5L));
+ verticesInput.add(new Tuple2<>(6L, 6L));
+ verticesInput.add(new Tuple2<>(7L, 7L));
+ verticesInput.add(new Tuple2<>(8L, 8L));
+ verticesInput.add(new Tuple2<>(9L, 9L));
+
// vertices input
- edgesInput.add(new Tuple2<Long, Long>(1l,2l));
- edgesInput.add(new Tuple2<Long, Long>(1l,3l));
- edgesInput.add(new Tuple2<Long, Long>(2l,3l));
- edgesInput.add(new Tuple2<Long, Long>(2l,4l));
- edgesInput.add(new Tuple2<Long, Long>(2l,1l));
- edgesInput.add(new Tuple2<Long, Long>(3l,1l));
- edgesInput.add(new Tuple2<Long, Long>(3l,2l));
- edgesInput.add(new Tuple2<Long, Long>(4l,2l));
- edgesInput.add(new Tuple2<Long, Long>(4l,6l));
- edgesInput.add(new Tuple2<Long, Long>(5l,6l));
- edgesInput.add(new Tuple2<Long, Long>(6l,4l));
- edgesInput.add(new Tuple2<Long, Long>(6l,5l));
- edgesInput.add(new Tuple2<Long, Long>(7l,8l));
- edgesInput.add(new Tuple2<Long, Long>(7l,9l));
- edgesInput.add(new Tuple2<Long, Long>(8l,7l));
- edgesInput.add(new Tuple2<Long, Long>(8l,9l));
- edgesInput.add(new Tuple2<Long, Long>(9l,7l));
- edgesInput.add(new Tuple2<Long, Long>(9l,8l));
-
+ edgesInput.add(new Tuple2<>(1L, 2L));
+ edgesInput.add(new Tuple2<>(1L, 3L));
+ edgesInput.add(new Tuple2<>(2L, 3L));
+ edgesInput.add(new Tuple2<>(2L, 4L));
+ edgesInput.add(new Tuple2<>(2L, 1L));
+ edgesInput.add(new Tuple2<>(3L, 1L));
+ edgesInput.add(new Tuple2<>(3L, 2L));
+ edgesInput.add(new Tuple2<>(4L, 2L));
+ edgesInput.add(new Tuple2<>(4L, 6L));
+ edgesInput.add(new Tuple2<>(5L, 6L));
+ edgesInput.add(new Tuple2<>(6L, 4L));
+ edgesInput.add(new Tuple2<>(6L, 5L));
+ edgesInput.add(new Tuple2<>(7L, 8L));
+ edgesInput.add(new Tuple2<>(7L, 9L));
+ edgesInput.add(new Tuple2<>(8L, 7L));
+ edgesInput.add(new Tuple2<>(8L, 9L));
+ edgesInput.add(new Tuple2<>(9L, 7L));
+ edgesInput.add(new Tuple2<>(9L, 8L));
+
resultPath = getTempDirPath("result");
-
+
expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,1)\n" + "(4,1)\n" +
"(5,1)\n" + "(6,1)\n" + "(7,7)\n" + "(8,7)\n" + "(9,7)\n";
}
@@ -101,63 +98,67 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
protected void testProgram() throws Exception {
DependencyConnectedComponentsProgram.runProgram(resultPath);
}
-
+
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(expectedResult, resultPath);
}
-
private static class DependencyConnectedComponentsProgram {
-
+
public static String runProgram(String resultPath) throws Exception {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
-
+
DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
int keyPosition = 0;
-
+
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
initialSolutionSet.iterateDelta(initialSolutionSet, MAX_ITERATIONS, keyPosition);
-
+
DataSet<Long> candidates = iteration.getWorkset().join(edges).where(0).equalTo(0)
.with(new FindCandidatesJoin())
- .groupBy(new KeySelector<Long, Long>() {
- public Long getKey(Long id) { return id; }
- }).reduceGroup(new RemoveDuplicatesReduce());
-
- DataSet<Tuple2<Long, Long>> candidatesDependencies =
+ .groupBy(new KeySelector<Long, Long>() {
+ public Long getKey(Long id) {
+ return id;
+ }
+ }).reduceGroup(new RemoveDuplicatesReduce());
+
+ DataSet<Tuple2<Long, Long>> candidatesDependencies =
candidates.join(edges)
- .where(new KeySelector<Long, Long>() {
- public Long getKey(Long id) { return id; }
- }).equalTo(new KeySelector<Tuple2<Long, Long>, Long>() {
- public Long getKey(Tuple2<Long, Long> vertexWithId)
- { return vertexWithId.f1; }
- }).with(new FindCandidatesDependenciesJoin());
-
- DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
+ .where(new KeySelector<Long, Long>() {
+ public Long getKey(Long id) {
+ return id;
+ }
+ }).equalTo(new KeySelector<Tuple2<Long, Long>, Long>() {
+ public Long getKey(Tuple2<Long, Long> vertexWithId) {
+ return vertexWithId.f1;
+ }
+ }).with(new FindCandidatesDependenciesJoin());
+
+ DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
candidatesDependencies.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new NeighborWithComponentIDJoin())
.groupBy(0).reduceGroup(new MinimumReduce());
-
- DataSet<Tuple2<Long, Long>> updatedComponentId =
+
+ DataSet<Tuple2<Long, Long>> updatedComponentId =
verticesWithNewComponents.join(iteration.getSolutionSet()).where(0).equalTo(0)
.flatMap(new MinimumIdFilter());
-
+
iteration.closeWith(updatedComponentId, updatedComponentId).writeAsText(resultPath);
-
+
env.execute();
-
+
return resultPath;
}
}
-
- public static final class FindCandidatesJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
-
+
+ private static final class FindCandidatesJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
+
private static final long serialVersionUID = 1L;
-
+
@Override
public Long join(Tuple2<Long, Long> vertexWithCompId,
Tuple2<Long, Long> edge) throws Exception {
@@ -165,9 +166,9 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
return edge.f1;
}
}
-
- public static final class RemoveDuplicatesReduce extends RichGroupReduceFunction<Long, Long> {
-
+
+ private static final class RemoveDuplicatesReduce extends RichGroupReduceFunction<Long, Long> {
+
private static final long serialVersionUID = 1L;
@Override
@@ -175,35 +176,35 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
out.collect(values.iterator().next());
}
}
-
- public static final class FindCandidatesDependenciesJoin extends RichJoinFunction<Long, Tuple2<Long, Long>,Tuple2<Long, Long>> {
-
+
+ private static final class FindCandidatesDependenciesJoin extends RichJoinFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
private static final long serialVersionUID = 1L;
-
+
@Override
public Tuple2<Long, Long> join(Long candidateId, Tuple2<Long, Long> edge) throws Exception {
return edge;
}
}
-
- public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
+
+ private static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
private static final long serialVersionUID = 1L;
-
+
@Override
public Tuple2<Long, Long> join(Tuple2<Long, Long> edge,
Tuple2<Long, Long> vertexWithCompId) throws Exception {
-
+
vertexWithCompId.setField(edge.f1, 0);
return vertexWithCompId;
}
}
-
- public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
+
+ private static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
private static final long serialVersionUID = 1L;
final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
-
+
@Override
public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
Long vertexId = 0L;
@@ -223,16 +224,15 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
}
}
- public static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
-
+ private static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+
private static final long serialVersionUID = 1L;
-
+
@Override
public void flatMap(
Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> vertexWithNewAndOldId,
- Collector<Tuple2<Long, Long>> out)
- {
- if ( vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1 ) {
+ Collector<Tuple2<Long, Long>> out) {
+ if (vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1) {
out.collect(vertexWithNewAndOldId.f0);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
index 3f02064..f2548a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
@@ -18,9 +18,6 @@
package org.apache.flink.test.iterative;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -29,31 +26,37 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test empty (identity) delta iteration.
+ */
@SuppressWarnings("serial")
public class EmptyWorksetIterationITCase extends JavaProgramTestBase {
-
+
private List<Tuple2<Long, Long>> result = new ArrayList<Tuple2<Long, Long>>();
-
+
@Override
protected void testProgram() throws Exception {
-
+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20).map(new Dupl());
-
+
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 20, 0);
iter.closeWith(iter.getWorkset(), iter.getWorkset())
.output(new LocalCollectionOutputFormat<Tuple2<Long, Long>>(result));
-
+
env.execute();
}
- public static final class Dupl implements MapFunction<Long, Tuple2<Long, Long>> {
+ private static final class Dupl implements MapFunction<Long, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> map(Long value) {
return new Tuple2<Long, Long>(value, value);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
index c422aa4..9e9b5ab 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
@@ -18,35 +18,38 @@
package org.apache.flink.test.iterative;
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.test.util.JavaProgramTestBase;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test empty (identity) bulk iteration.
+ */
public class IdentityIterationITCase extends JavaProgramTestBase {
private List<Long> result = new ArrayList<Long>();
-
+
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
IterativeDataSet<Long> iteration = env.generateSequence(1, 10).iterate(100);
iteration.closeWith(iteration)
.output(new LocalCollectionOutputFormat<Long>(result));
-
+
env.execute();
}
-
+
@Override
protected void postSubmit() {
assertEquals(10, result.size());
-
+
long sum = 0;
for (Long l : result) {
sum += l;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
index 48788a5..458e453 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
@@ -26,16 +26,20 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * Test where the test data is constructed such that the merge join zig zag
+ * has an early out, leaving elements on the dynamic path input unconsumed.
+ */
@SuppressWarnings("serial")
public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgramTestBase {
-
+
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// the test data is constructed such that the merge join zig zag
// has an early out, leaving elements on the dynamic path input unconsumed
-
+
DataSet<Path> edges = env.fromElements(
new Path(1, 2),
new Path(1, 4),
@@ -46,24 +50,24 @@ public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgram
new Path(3, 14),
new Path(3, 16),
new Path(1, 18),
- new Path(1, 20) );
-
+ new Path(1, 20));
+
IterativeDataSet<Path> currentPaths = edges.iterate(10);
-
+
DataSet<Path> newPaths = currentPaths
.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
.with(new PathConnector())
.union(currentPaths).distinct("from", "to");
-
+
DataSet<Path> result = currentPaths.closeWith(newPaths);
-
+
result.output(new DiscardingOutputFormat<Path>());
-
+
env.execute();
}
-
+
private static class PathConnector implements JoinFunction<Path, Path, Path> {
-
+
@Override
public Path join(Path path, Path edge) {
return new Path(path.from, edge.to);
@@ -71,19 +75,22 @@ public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgram
}
// --------------------------------------------------------------------------------------------
-
+
+ /**
+ * Simple POJO.
+ */
public static class Path {
-
+
public long from;
public long to;
-
+
public Path() {}
-
+
public Path(long from, long to) {
this.from = from;
this.to = to;
}
-
+
@Override
public String toString() {
return "(" + from + "," + to + ")";
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
index b42e86b..f7cd111 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
@@ -26,16 +26,20 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * Test where the test data is constructed such that the merge join zig zag
+ * has an early out, leaving elements on the static path input unconsumed.
+ */
@SuppressWarnings("serial")
public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramTestBase {
-
+
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// the test data is constructed such that the merge join zig zag
// has an early out, leaving elements on the static path input unconsumed
-
+
DataSet<Path> edges = env.fromElements(
new Path(2, 1),
new Path(4, 1),
@@ -46,24 +50,24 @@ public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramT
new Path(14, 3),
new Path(16, 3),
new Path(18, 1),
- new Path(20, 1) );
-
+ new Path(20, 1));
+
IterativeDataSet<Path> currentPaths = edges.iterate(10);
-
+
DataSet<Path> newPaths = currentPaths
.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
.with(new PathConnector())
.union(currentPaths).distinct("from", "to");
-
+
DataSet<Path> result = currentPaths.closeWith(newPaths);
-
+
result.output(new DiscardingOutputFormat<Path>());
-
+
env.execute();
}
-
+
private static class PathConnector implements JoinFunction<Path, Path, Path> {
-
+
@Override
public Path join(Path path, Path edge) {
return new Path(path.from, edge.to);
@@ -71,19 +75,22 @@ public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramT
}
// --------------------------------------------------------------------------------------------
-
+
+ /**
+ * Simple POJO.
+ */
public static class Path {
-
+
public long from;
public long to;
-
+
public Path() {}
-
+
public Path(long from, long to) {
this.from = from;
this.to = to;
}
-
+
@Override
public String toString() {
return "(" + from + "," + to + ")";