You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/09/11 14:25:25 UTC
[flink] 01/03: [hotfix] Fix checkstyle violations in JobGraph and
JobGraphTest
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6a5987fd21f4de29cc4dfed0a864495c60cdb96e
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon Aug 31 16:31:46 2020 +0800
[hotfix] Fix checkstyle violations in JobGraph and JobGraphTest
---
.../apache/flink/runtime/jobgraph/JobGraph.java | 10 ++--
.../flink/runtime/jobgraph/JobGraphTest.java | 65 +++++++++++-----------
2 files changed, 39 insertions(+), 36 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 1125367..95a9a28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -74,15 +74,15 @@ public class JobGraph implements Serializable {
/** Name of this job. */
private final String jobName;
- /** The mode in which the job is scheduled */
+ /** The mode in which the job is scheduled. */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
// --- checkpointing ---
- /** Job specific execution config */
+ /** Job specific execution config. */
private SerializedValue<ExecutionConfig> serializedExecutionConfig;
- /** The settings for the job checkpoints */
+ /** The settings for the job checkpoints. */
private JobCheckpointingSettings snapshotSettings;
/** Savepoint restore settings. */
@@ -210,7 +210,7 @@ public class JobGraph implements Serializable {
}
/**
- * Returns the {@link ExecutionConfig}
+ * Returns the {@link ExecutionConfig}.
*
* @return ExecutionConfig
*/
@@ -321,7 +321,7 @@ public class JobGraph implements Serializable {
}
/**
- * Checks if the checkpointing was enabled for this job graph
+ * Checks if the checkpointing was enabled for this job graph.
*
* @return true if checkpointing enabled
*/
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index a0c751c..e1c8d5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -41,19 +41,22 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for {@link JobGraph}.
+ */
public class JobGraphTest extends TestLogger {
@Test
public void testSerialization() {
try {
JobGraph jg = new JobGraph("The graph");
-
+
// add some configuration values
{
jg.getJobConfiguration().setString("some key", "some value");
jg.getJobConfiguration().setDouble("Life of ", Math.PI);
}
-
+
// add some vertices
{
JobVertex source1 = new JobVertex("source1");
@@ -61,12 +64,12 @@ public class JobGraphTest extends TestLogger {
JobVertex target = new JobVertex("target");
target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
target.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-
+
jg.addVertex(source1);
jg.addVertex(source2);
jg.addVertex(target);
}
-
+
// de-/serialize and compare
JobGraph copy = CommonTestUtils.createCopySerializable(jg);
@@ -74,7 +77,7 @@ public class JobGraphTest extends TestLogger {
assertEquals(jg.getJobID(), copy.getJobID());
assertEquals(jg.getJobConfiguration(), copy.getJobConfiguration());
assertEquals(jg.getNumberOfVertices(), copy.getNumberOfVertices());
-
+
for (JobVertex vertex : copy.getVertices()) {
JobVertex original = jg.findVertexByID(vertex.getID());
assertNotNull(original);
@@ -88,7 +91,7 @@ public class JobGraphTest extends TestLogger {
fail(e.getMessage());
}
}
-
+
@Test
public void testTopologicalSort1() {
try {
@@ -98,7 +101,7 @@ public class JobGraphTest extends TestLogger {
JobVertex target2 = new JobVertex("target2");
JobVertex intermediate1 = new JobVertex("intermediate1");
JobVertex intermediate2 = new JobVertex("intermediate2");
-
+
target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
@@ -108,9 +111,9 @@ public class JobGraphTest extends TestLogger {
JobGraph graph = new JobGraph("TestGraph",
source1, source2, intermediate1, intermediate2, target1, target2);
List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
-
+
assertEquals(6, sorted.size());
-
+
assertBefore(source1, target1, sorted);
assertBefore(source1, target2, sorted);
assertBefore(source2, target2, sorted);
@@ -124,7 +127,7 @@ public class JobGraphTest extends TestLogger {
fail(e.getMessage());
}
}
-
+
@Test
public void testTopologicalSort2() {
try {
@@ -135,41 +138,41 @@ public class JobGraphTest extends TestLogger {
JobVertex l12 = new JobVertex("layer 1 - 2");
JobVertex l13 = new JobVertex("layer 1 - 3");
JobVertex l2 = new JobVertex("layer 2");
-
+
root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
root.connectNewDataSetAsInput(l2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-
+
l2.connectNewDataSetAsInput(l11, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
l2.connectNewDataSetAsInput(l12, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-
+
l11.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-
+
l12.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-
+
l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
JobGraph graph = new JobGraph("TestGraph",
source1, source2, root, l11, l13, l12, l2);
List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
-
+
assertEquals(7, sorted.size());
-
+
assertBefore(source1, root, sorted);
assertBefore(source2, root, sorted);
assertBefore(l11, root, sorted);
assertBefore(l12, root, sorted);
assertBefore(l13, root, sorted);
assertBefore(l2, root, sorted);
-
+
assertBefore(l11, l2, sorted);
assertBefore(l12, l2, sorted);
assertBefore(l2, root, sorted);
-
+
assertBefore(source1, l2, sorted);
assertBefore(source2, l2, sorted);
-
+
assertBefore(source2, l13, sorted);
}
catch (Exception e) {
@@ -177,7 +180,7 @@ public class JobGraphTest extends TestLogger {
fail(e.getMessage());
}
}
-
+
@Test
public void testTopologicalSort3() {
// --> op1 --
@@ -185,13 +188,13 @@ public class JobGraphTest extends TestLogger {
// (source) - +-> op2 -> op3
// \ /
// ---------
-
+
try {
JobVertex source = new JobVertex("source");
JobVertex op1 = new JobVertex("op4");
JobVertex op2 = new JobVertex("op2");
JobVertex op3 = new JobVertex("op3");
-
+
op1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
@@ -199,9 +202,9 @@ public class JobGraphTest extends TestLogger {
JobGraph graph = new JobGraph("TestGraph", source, op1, op2, op3);
List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
-
+
assertEquals(4, sorted.size());
-
+
assertBefore(source, op1, sorted);
assertBefore(source, op2, sorted);
assertBefore(op1, op2, sorted);
@@ -212,7 +215,7 @@ public class JobGraphTest extends TestLogger {
fail(e.getMessage());
}
}
-
+
@Test
public void testTopoSortCyclicGraphNoSources() {
try {
@@ -220,7 +223,7 @@ public class JobGraphTest extends TestLogger {
JobVertex v2 = new JobVertex("2");
JobVertex v3 = new JobVertex("3");
JobVertex v4 = new JobVertex("4");
-
+
v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
@@ -240,17 +243,17 @@ public class JobGraphTest extends TestLogger {
fail(e.getMessage());
}
}
-
+
@Test
public void testTopoSortCyclicGraphIntermediateCycle() {
- try{
+ try {
JobVertex source = new JobVertex("source");
JobVertex v1 = new JobVertex("1");
JobVertex v2 = new JobVertex("2");
JobVertex v3 = new JobVertex("3");
JobVertex v4 = new JobVertex("4");
JobVertex target = new JobVertex("target");
-
+
v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
@@ -272,7 +275,7 @@ public class JobGraphTest extends TestLogger {
fail(e.getMessage());
}
}
-
+
private static final void assertBefore(JobVertex v1, JobVertex v2, List<JobVertex> list) {
boolean seenFirst = false;
for (JobVertex v : list) {