You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/21 11:09:12 UTC
[2/6] flink git commit: [hotfix] [jobmanager] Minor code cleanups in
JobGraph and CheckpointCoordinator
[hotfix] [jobmanager] Minor code cleanups in JobGraph and CheckpointCoordinator
This makes the exception that can occur during serialization of the ExecutionConfig explicit,
and adds some comments to JobGraph.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f63426b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f63426b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f63426b0
Branch: refs/heads/master
Commit: f63426b0322e05fd0986ae5f224a69b1320724f6
Parents: 50fd1a3
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 16 18:34:51 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 19:43:15 2017 +0100
----------------------------------------------------------------------
.../plantranslate/JobGraphGenerator.java | 9 ++-
.../checkpoint/CheckpointCoordinator.java | 2 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 84 +++++++++++---------
.../LeaderChangeJobRecoveryTest.java | 8 +-
.../runtime/minicluster/MiniClusterITCase.java | 4 +-
.../api/graph/StreamingJobGraphGenerator.java | 10 ++-
6 files changed, 69 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 6f7b04a..caeb43f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -83,6 +83,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.Visitor;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -223,7 +224,13 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// create the job graph object
JobGraph graph = new JobGraph(jobId, program.getJobName());
- graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
+ try {
+ graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
+ }
+ catch (IOException e) {
+ throw new CompilerException("Could not serialize the ExecutionConfig." +
+ "This indicates that non-serializable types (like custom serializers) were registered");
+ }
graph.setAllowQueuedScheduling(false);
graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 78cad91..6cac006 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -623,7 +623,7 @@ public class CheckpointCoordinator {
* @return Flag indicating whether the ack'd checkpoint was associated
* with a pending checkpoint.
*
- * @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
+ * @throws CheckpointException If the checkpoint cannot be added to the completed checkpoint store.
*/
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
if (shutdown || message == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6db9277..f6377e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -53,18 +53,16 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
* form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
- * but inside certain special vertices that establish the feedback channel amongst themselves.</p>
+ * but inside certain special vertices that establish the feedback channel amongst themselves.
*
* <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
- * define the characteristics of the concrete operation and intermediate data.</p>
+ * define the characteristics of the concrete operation and intermediate data.
*/
public class JobGraph implements Serializable {
private static final long serialVersionUID = 1L;
- // --------------------------------------------------------------------------------------------
- // Members that define the structure / topology of the graph
- // --------------------------------------------------------------------------------------------
+ // --- job and configuration ---
/** List of task vertices included in this job graph. */
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
@@ -72,12 +70,6 @@ public class JobGraph implements Serializable {
/** The job configuration attached to this job. */
private final Configuration jobConfiguration = new Configuration();
- /** Set of JAR files required to run this job. */
- private final List<Path> userJars = new ArrayList<Path>();
-
- /** Set of blob keys identifying the JAR files required to run this job. */
- private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
-
/** ID of this job. May be set if specific job id is desired (e.g. session management) */
private final JobID jobID;
@@ -94,18 +86,28 @@ public class JobGraph implements Serializable {
/** The mode in which the job is scheduled */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
- /** The settings for asynchronous snapshots */
- private JobSnapshottingSettings snapshotSettings;
-
- /** List of classpaths required to run this job. */
- private List<URL> classpaths = Collections.emptyList();
+ // --- checkpointing ---
/** Job specific execution config */
private SerializedValue<ExecutionConfig> serializedExecutionConfig;
+ /** The settings for the job checkpoints */
+ private JobSnapshottingSettings snapshotSettings;
+
/** Savepoint restore settings. */
private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
+ // --- attached resources ---
+
+ /** Set of JAR files required to run this job. */
+ private final List<Path> userJars = new ArrayList<Path>();
+
+ /** Set of blob keys identifying the JAR files required to run this job. */
+ private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
+
+ /** List of classpaths required to run this job. */
+ private List<URL> classpaths = Collections.emptyList();
+
// --------------------------------------------------------------------------------------------
/**
@@ -129,7 +131,13 @@ public class JobGraph implements Serializable {
public JobGraph(JobID jobId, String jobName) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
- setExecutionConfig(new ExecutionConfig());
+
+ try {
+ setExecutionConfig(new ExecutionConfig());
+ } catch (IOException e) {
+ // this should never happen, since an empty execution config is always serializable
+ throw new RuntimeException("bug, empty execution config is not serializable");
+ }
}
/**
@@ -260,17 +268,16 @@ public class JobGraph implements Serializable {
}
/**
- * Sets a serialized copy of the passed ExecutionConfig. Further modification of the referenced ExecutionConfig
- * object will not affect this serialized copy.
+ * Sets the execution config. This method eagerly serialized the ExecutionConfig for future RPC
+ * transport. Further modification of the referenced ExecutionConfig object will not affect
+ * this serialized copy.
+ *
* @param executionConfig The ExecutionConfig to be serialized.
+ * @throws IOException Thrown if the serialization of the ExecutionConfig fails
*/
- public void setExecutionConfig(ExecutionConfig executionConfig) {
+ public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException {
checkNotNull(executionConfig, "ExecutionConfig must not be null.");
- try {
- this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
- } catch (IOException e) {
- throw new RuntimeException("Could not serialize ExecutionConfig.", e);
- }
+ this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
}
/**
@@ -362,6 +369,21 @@ public class JobGraph implements Serializable {
return classpaths;
}
+ /**
+ * Gets the maximum parallelism of all operations in this job graph.
+ *
+ * @return The maximum parallelism of this job graph
+ */
+ public int getMaximumParallelism() {
+ int maxParallelism = -1;
+ for (JobVertex vertex : taskVertices.values()) {
+ maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
+ }
+ return maxParallelism;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Topological Graph Access
// --------------------------------------------------------------------------------------------
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
@@ -539,18 +561,6 @@ public class JobGraph implements Serializable {
}
/**
- * Gets the maximum parallelism of all operations in this job graph.
- * @return The maximum parallelism of this job graph
- */
- public int getMaximumParallelism() {
- int maxParallelism = -1;
- for (JobVertex vertex : taskVertices.values()) {
- maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
- }
- return maxParallelism;
- }
-
- /**
* Uploads the previously added user JAR files to the job manager through
* the job manager's BLOB server. The respective port is retrieved from the
* JobManager. This function issues a blocking call.
http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index be26e7b..fe33022 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.leaderelection;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -136,11 +135,6 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
sender.setSlotSharingGroup(slotSharingGroup);
receiver.setSlotSharingGroup(slotSharingGroup);
- ExecutionConfig executionConfig = new ExecutionConfig();
-
- JobGraph jobGraph = new JobGraph("Blocking test job", sender, receiver);
- jobGraph.setExecutionConfig(executionConfig);
-
- return jobGraph;
+ return new JobGraph("Blocking test job", sender, receiver);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f656622..f90367c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -29,6 +29,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.io.IOException;
+
/**
* Integration test cases for the {@link MiniCluster}.
*/
@@ -95,7 +97,7 @@ public class MiniClusterITCase extends TestLogger {
miniCluster.runJobBlocking(job);
}
- private static JobGraph getSimpleJob() {
+ private static JobGraph getSimpleJob() throws IOException {
JobVertex task = new JobVertex("Test task");
task.setParallelism(1);
task.setMaxParallelism(1);
http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8877c80..a4bb165 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -50,6 +51,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -129,7 +131,13 @@ public class StreamingJobGraphGenerator {
configureCheckpointing();
// set the ExecutionConfig last when it has been finalized
- jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
+ try {
+ jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
+ }
+ catch (IOException e) {
+ throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
+ "This indicates that non-serializable types (like custom serializers) were registered");
+ }
return jobGraph;
}