You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/21 11:09:12 UTC

[2/6] flink git commit: [hotfix] [jobmanager] Minor code cleanups in JobGraph and CheckpointCoordinator

[hotfix] [jobmanager] Minor code cleanups in JobGraph and CheckpointCoordinator

This makes the exception that can occur during serialization of the ExecutionConfig explicit,
and adds some comments to JobGraph.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f63426b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f63426b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f63426b0

Branch: refs/heads/master
Commit: f63426b0322e05fd0986ae5f224a69b1320724f6
Parents: 50fd1a3
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 16 18:34:51 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 19:43:15 2017 +0100

----------------------------------------------------------------------
 .../plantranslate/JobGraphGenerator.java        |  9 ++-
 .../checkpoint/CheckpointCoordinator.java       |  2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java | 84 +++++++++++---------
 .../LeaderChangeJobRecoveryTest.java            |  8 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  4 +-
 .../api/graph/StreamingJobGraphGenerator.java   | 10 ++-
 6 files changed, 69 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 6f7b04a..caeb43f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -83,6 +83,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.Visitor;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -223,7 +224,13 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 
 		// create the job graph object
 		JobGraph graph = new JobGraph(jobId, program.getJobName());
-		graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
+		try {
+			graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
+		}
+		catch (IOException e) {
+			throw new CompilerException("Could not serialize the ExecutionConfig." +
+					"This indicates that non-serializable types (like custom serializers) were registered");
+		}
 
 		graph.setAllowQueuedScheduling(false);
 		graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 78cad91..6cac006 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -623,7 +623,7 @@ public class CheckpointCoordinator {
 	 * @return Flag indicating whether the ack'd checkpoint was associated
 	 * with a pending checkpoint.
 	 *
-	 * @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
+	 * @throws CheckpointException If the checkpoint cannot be added to the completed checkpoint store.
 	 */
 	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
 		if (shutdown || message == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6db9277..f6377e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -53,18 +53,16 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
  * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
- * but inside certain special vertices that establish the feedback channel amongst themselves.</p>
+ * but inside certain special vertices that establish the feedback channel amongst themselves.
  *
  * <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
- * define the characteristics of the concrete operation and intermediate data.</p>
+ * define the characteristics of the concrete operation and intermediate data.
  */
 public class JobGraph implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	// --------------------------------------------------------------------------------------------
-	// Members that define the structure / topology of the graph
-	// --------------------------------------------------------------------------------------------
+	// --- job and configuration ---
 
 	/** List of task vertices included in this job graph. */
 	private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
@@ -72,12 +70,6 @@ public class JobGraph implements Serializable {
 	/** The job configuration attached to this job. */
 	private final Configuration jobConfiguration = new Configuration();
 
-	/** Set of JAR files required to run this job. */
-	private final List<Path> userJars = new ArrayList<Path>();
-
-	/** Set of blob keys identifying the JAR files required to run this job. */
-	private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
-
 	/** ID of this job. May be set if specific job id is desired (e.g. session management) */
 	private final JobID jobID;
 
@@ -94,18 +86,28 @@ public class JobGraph implements Serializable {
 	/** The mode in which the job is scheduled */
 	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
-	/** The settings for asynchronous snapshots */
-	private JobSnapshottingSettings snapshotSettings;
-
-	/** List of classpaths required to run this job. */
-	private List<URL> classpaths = Collections.emptyList();
+	// --- checkpointing ---
 
 	/** Job specific execution config */
 	private SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
+	/** The settings for the job checkpoints */
+	private JobSnapshottingSettings snapshotSettings;
+
 	/** Savepoint restore settings. */
 	private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
 
+	// --- attached resources ---
+
+	/** Set of JAR files required to run this job. */
+	private final List<Path> userJars = new ArrayList<Path>();
+
+	/** Set of blob keys identifying the JAR files required to run this job. */
+	private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
+
+	/** List of classpaths required to run this job. */
+	private List<URL> classpaths = Collections.emptyList();
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -129,7 +131,13 @@ public class JobGraph implements Serializable {
 	public JobGraph(JobID jobId, String jobName) {
 		this.jobID = jobId == null ? new JobID() : jobId;
 		this.jobName = jobName == null ? "(unnamed job)" : jobName;
-		setExecutionConfig(new ExecutionConfig());
+
+		try {
+			setExecutionConfig(new ExecutionConfig());
+		} catch (IOException e) {
+			// this should never happen, since an empty execution config is always serializable
+			throw new RuntimeException("bug, empty execution config is not serializable");
+		}
 	}
 
 	/**
@@ -260,17 +268,16 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Sets a serialized copy of the passed ExecutionConfig. Further modification of the referenced ExecutionConfig
-	 * object will not affect this serialized copy.
+	 * Sets the execution config. This method eagerly serialized the ExecutionConfig for future RPC
+	 * transport. Further modification of the referenced ExecutionConfig object will not affect
+	 * this serialized copy.
+	 * 
 	 * @param executionConfig The ExecutionConfig to be serialized.
+	 * @throws IOException Thrown if the serialization of the ExecutionConfig fails
 	 */
-	public void setExecutionConfig(ExecutionConfig executionConfig) {
+	public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException {
 		checkNotNull(executionConfig, "ExecutionConfig must not be null.");
-		try {
-			this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
-		} catch (IOException e) {
-			throw new RuntimeException("Could not serialize ExecutionConfig.", e);
-		}
+		this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
 	}
 
 	/**
@@ -362,6 +369,21 @@ public class JobGraph implements Serializable {
 		return classpaths;
 	}
 
+	/**
+	 * Gets the maximum parallelism of all operations in this job graph.
+	 *
+	 * @return The maximum parallelism of this job graph
+	 */
+	public int getMaximumParallelism() {
+		int maxParallelism = -1;
+		for (JobVertex vertex : taskVertices.values()) {
+			maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
+		}
+		return maxParallelism;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Topological Graph Access
 	// --------------------------------------------------------------------------------------------
 
 	public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
@@ -539,18 +561,6 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Gets the maximum parallelism of all operations in this job graph.
-	 * @return The maximum parallelism of this job graph
-	 */
-	public int getMaximumParallelism() {
-		int maxParallelism = -1;
-		for (JobVertex vertex : taskVertices.values()) {
-			maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
-		}
-		return maxParallelism;
-	}
-
-	/**
 	 * Uploads the previously added user JAR files to the job manager through
 	 * the job manager's BLOB server. The respective port is retrieved from the
 	 * JobManager. This function issues a blocking call.

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index be26e7b..fe33022 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -136,11 +135,6 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 		sender.setSlotSharingGroup(slotSharingGroup);
 		receiver.setSlotSharingGroup(slotSharingGroup);
 
-		ExecutionConfig executionConfig = new ExecutionConfig();
-
-		JobGraph jobGraph = new JobGraph("Blocking test job", sender, receiver);
-		jobGraph.setExecutionConfig(executionConfig);
-
-		return jobGraph;
+		return new JobGraph("Blocking test job", sender, receiver);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f656622..f90367c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -29,6 +29,8 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.IOException;
+
 /**
  * Integration test cases for the {@link MiniCluster}.
  */
@@ -95,7 +97,7 @@ public class MiniClusterITCase extends TestLogger {
 		miniCluster.runJobBlocking(job);
 	}
 
-	private static JobGraph getSimpleJob() {
+	private static JobGraph getSimpleJob() throws IOException {
 		JobVertex task = new JobVertex("Test task");
 		task.setParallelism(1);
 		task.setMaxParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8877c80..a4bb165 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -50,6 +51,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -129,7 +131,13 @@ public class StreamingJobGraphGenerator {
 		configureCheckpointing();
 
 		// set the ExecutionConfig last when it has been finalized
-		jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
+		try {
+			jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
+		}
+		catch (IOException e) {
+			throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
+					"This indicates that non-serializable types (like custom serializers) were registered");
+		}
 
 		return jobGraph;
 	}