You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/29 08:34:06 UTC

flink git commit: [FLINK-7667] [flip6] Use ArchivedExecutionGraph as serializable AccessExecutionGraph

Repository: flink
Updated Branches:
  refs/heads/master 4902cdca0 -> 2dd557fad


[FLINK-7667] [flip6] Use ArchivedExecutionGraph as serializable AccessExecutionGraph

This commit removes AccessExecutionGraph#getCheckpointCoordinator and changes the
AccessExecutionGraph#getJobCheckpointSettings into #getJobCheckpointConfiguration.
The JobCheckpointConfiguration only contains the CheckpointCoordinator relevant
configuration settings and excludes the serialized state backend and the
serialized master hooks. That way we don't send unnecessary information when
the ArchivedExecutionGraph is requested.

This closes #4727.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dd557fa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dd557fa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dd557fa

Branch: refs/heads/master
Commit: 2dd557fad4a0a205a3e163fa918507d34c933c6a
Parents: 4902cdc
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Sep 22 13:31:12 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 19:06:21 2017 +0200

----------------------------------------------------------------------
 .../api/common/ArchivedExecutionConfig.java     |   3 +
 .../jobmanager/JMXJobManagerMetricTest.java     |  10 +-
 .../checkpoint/CheckpointStatsTracker.java      |  34 ++---
 .../executiongraph/AccessExecutionGraph.java    |  21 ++-
 .../executiongraph/ArchivedExecutionGraph.java  |  18 +--
 .../runtime/executiongraph/ExecutionGraph.java  |  21 +--
 .../executiongraph/ExecutionGraphBuilder.java   |  33 ++---
 .../CheckpointCoordinatorConfiguration.java     | 134 +++++++++++++++++++
 .../tasks/ExternalizedCheckpointSettings.java   |  19 +++
 .../tasks/JobCheckpointingSettings.java         | 102 ++++----------
 .../JobCancellationWithSavepointHandlers.java   |  10 +-
 .../checkpoints/CheckpointConfigHandler.java    |  18 +--
 .../CheckpointSettingsSerializableTest.java     |  17 ++-
 .../checkpoint/CheckpointStatsTrackerTest.java  |  33 ++---
 .../checkpoint/CoordinatorShutdownTest.java     |  31 ++++-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   9 +-
 .../ArchivedExecutionGraphTest.java             |   5 +-
 .../ExecutionGraphDeploymentTest.java           |  25 ++--
 .../tasks/JobCheckpointingSettingsTest.java     |  25 ++--
 .../jobmanager/JobManagerHARecoveryTest.java    |  16 ++-
 .../runtime/jobmanager/JobManagerTest.java      |  72 +++++-----
 .../flink/runtime/jobmanager/JobSubmitTest.java |  16 ++-
 ...obCancellationWithSavepointHandlersTest.java |  45 +++++--
 .../CheckpointConfigHandlerTest.java            |  39 +++---
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  47 ++++---
 .../api/graph/StreamingJobGraphGenerator.java   |  16 ++-
 .../graph/StreamingJobGraphGeneratorTest.java   |   2 +-
 28 files changed, 504 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
index 700d65f..c013a6b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common;
 
 import org.apache.flink.annotation.Internal;
@@ -31,6 +32,8 @@ import java.util.Map;
 @Internal
 public class ArchivedExecutionConfig implements Serializable {
 
+	private static final long serialVersionUID = 2126156250920316528L;
+
 	private final String executionMode;
 	private final String restartStrategyDescription;
 	private final int parallelism;

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 08b9373..7280476 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
@@ -82,7 +83,14 @@ public class JMXJobManagerMetricTest {
 				Collections.<JobVertexID>emptyList(),
 				Collections.<JobVertexID>emptyList(),
 				Collections.<JobVertexID>emptyList(),
-				500, 500, 50, 5, ExternalizedCheckpointSettings.none(), null, true));
+				new CheckpointCoordinatorConfiguration(
+					500,
+					500,
+					50,
+					5,
+					ExternalizedCheckpointSettings.none(),
+					true),
+				null));
 
 			flink.waitForActorsToBeAlive();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 09b8c78..e6386ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -18,20 +18,22 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.Nullable;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Tracker for checkpoint statistics.
@@ -67,7 +69,7 @@ public class CheckpointStatsTracker {
 	private final int totalSubtaskCount;
 
 	/** Snapshotting settings created from the CheckpointConfig. */
-	private final JobCheckpointingSettings jobCheckpointingSettings;
+	private final CheckpointCoordinatorConfiguration jobCheckpointingConfiguration;
 
 	/** Checkpoint counts. */
 	private final CheckpointStatsCounts counts = new CheckpointStatsCounts();
@@ -104,19 +106,19 @@ public class CheckpointStatsTracker {
 	 *
 	 * @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in progress ones.
 	 * @param jobVertices Job vertices involved in the checkpoints.
-	 * @param jobCheckpointingSettings Snapshotting settings created from the CheckpointConfig.
+	 * @param jobCheckpointingConfiguration Checkpointing configuration.
 	 * @param metricGroup Metric group for exposed metrics
 	 */
 	public CheckpointStatsTracker(
 		int numRememberedCheckpoints,
 		List<ExecutionJobVertex> jobVertices,
-		JobCheckpointingSettings jobCheckpointingSettings,
+		CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
 		MetricGroup metricGroup) {
 
 		checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints");
 		this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
 		this.jobVertices = checkNotNull(jobVertices, "JobVertices");
-		this.jobCheckpointingSettings = checkNotNull(jobCheckpointingSettings);
+		this.jobCheckpointingConfiguration = checkNotNull(jobCheckpointingConfiguration);
 
 		// Compute the total subtask count. We do this here in order to only
 		// do it once.
@@ -138,13 +140,13 @@ public class CheckpointStatsTracker {
 	}
 
 	/**
-	 * Returns the job's snapshotting settings which are derived from the
+	 * Returns the job's checkpointing configuration which is derived from the
 	 * CheckpointConfig.
 	 *
-	 * @return The job's snapshotting settings.
+	 * @return The job's checkpointing configuration.
 	 */
-	public JobCheckpointingSettings getSnapshottingSettings() {
-		return jobCheckpointingSettings;
+	public CheckpointCoordinatorConfiguration getJobCheckpointingConfiguration() {
+		return jobCheckpointingConfiguration;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index df4ed2e..cab3c92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.util.SerializedValue;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Map;
 
@@ -108,20 +109,13 @@ public interface AccessExecutionGraph {
 	long getStatusTimestamp(JobStatus status);
 
 	/**
-	 * Returns the {@link CheckpointCoordinator} for this execution graph.
-	 *
-	 * @return CheckpointCoordinator for this execution graph or <code>null</code>
-	 * if none is available.
-	 */
-	CheckpointCoordinator getCheckpointCoordinator();
-
-	/**
-	 * Returns the {@link JobCheckpointingSettings} or <code>null</code> if
+	 * Returns the {@link CheckpointCoordinatorConfiguration} or <code>null</code> if
 	 * checkpointing is disabled.
 	 *
-	 * @return JobSnapshottingSettings for this execution graph
+	 * @return JobCheckpointingConfiguration for this execution graph
 	 */
-	JobCheckpointingSettings getJobCheckpointingSettings();
+	@Nullable
+	CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration();
 
 	/**
 	 * Returns a snapshot of the checkpoint statistics or <code>null</code> if
@@ -129,6 +123,7 @@ public interface AccessExecutionGraph {
 	 *
 	 * @return Snapshot of the checkpoint statistics for this execution graph
 	 */
+	@Nullable
 	CheckpointStatsSnapshot getCheckpointStatsSnapshot();
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 901b973..7f857f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Iterator;
@@ -81,7 +81,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	private final Map<String, SerializedValue<Object>> serializedUserAccumulators;
 
 	@Nullable
-	private final JobCheckpointingSettings jobCheckpointingSettings;
+	private final CheckpointCoordinatorConfiguration jobCheckpointingConfiguration;
 
 	@Nullable
 	private final CheckpointStatsSnapshot checkpointStatsSnapshot;
@@ -99,7 +99,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 			Map<String, SerializedValue<Object>> serializedUserAccumulators,
 			ArchivedExecutionConfig executionConfig,
 			boolean isStoppable,
-			@Nullable JobCheckpointingSettings jobCheckpointingSettings,
+			@Nullable CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
 			@Nullable CheckpointStatsSnapshot checkpointStatsSnapshot) {
 
 		this.jobID = jobID;
@@ -114,7 +114,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 		this.serializedUserAccumulators = serializedUserAccumulators;
 		this.archivedExecutionConfig = executionConfig;
 		this.isStoppable = isStoppable;
-		this.jobCheckpointingSettings = jobCheckpointingSettings;
+		this.jobCheckpointingConfiguration = jobCheckpointingConfiguration;
 		this.checkpointStatsSnapshot = checkpointStatsSnapshot;
 	}
 
@@ -206,12 +206,8 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	}
 
 	@Override
-	public CheckpointCoordinator getCheckpointCoordinator() {
-		return null;
-	}
-
-	public JobCheckpointingSettings getJobCheckpointingSettings() {
-		return jobCheckpointingSettings;
+	public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() {
+		return jobCheckpointingConfiguration;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 2e5f3d1..56733ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -57,18 +57,18 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.StringUtils;
 
@@ -478,7 +478,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	@Override
 	public CheckpointCoordinator getCheckpointCoordinator() {
 		return checkpointCoordinator;
 	}
@@ -491,9 +490,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return restartStrategy;
 	}
 
-	public JobCheckpointingSettings getJobCheckpointingSettings() {
+	@Override
+	public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() {
 		if (checkpointStatsTracker != null) {
-			return checkpointStatsTracker.getSnapshottingSettings();
+			return checkpointStatsTracker.getJobCheckpointingConfiguration();
 		} else {
 			return null;
 		}
@@ -731,9 +731,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
 
-		Map<String, SerializedValue<Object>> result = new HashMap<>();
+		Map<String, SerializedValue<Object>> result = new HashMap<>(accumulatorMap.size());
 		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
-			result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
+			result.put(entry.getKey(), new SerializedValue<>(entry.getValue().getLocalValue()));
 		}
 
 		return result;
@@ -1685,8 +1685,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	@Override
 	public ArchivedExecutionGraph archive() {
-		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>();
-		List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>();
+		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>(verticesInCreationOrder.size());
+		List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>(verticesInCreationOrder.size());
+
 		for (ExecutionJobVertex task : verticesInCreationOrder) {
 			ArchivedExecutionJobVertex archivedTask = task.archive();
 			archivedVerticesInCreationOrder.add(archivedTask);
@@ -1714,7 +1715,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			serializedUserAccumulators,
 			getArchivedExecutionConfig(),
 			isStoppable(),
-			getJobCheckpointingSettings(),
+			getCheckpointCoordinatorConfiguration(),
 			getCheckpointStatsSnapshot());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index b79503a..ba66a2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
@@ -213,7 +214,7 @@ public class ExecutionGraphBuilder {
 			CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
 					historySize,
 					ackVertices,
-					snapshotSettings,
+					snapshotSettings.getCheckpointCoordinatorConfiguration(),
 					metrics);
 
 			// The default directory for externalized checkpoints
@@ -274,21 +275,23 @@ public class ExecutionGraphBuilder {
 				}
 			}
 
+			final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();
+
 			executionGraph.enableCheckpointing(
-					snapshotSettings.getCheckpointInterval(),
-					snapshotSettings.getCheckpointTimeout(),
-					snapshotSettings.getMinPauseBetweenCheckpoints(),
-					snapshotSettings.getMaxConcurrentCheckpoints(),
-					snapshotSettings.getExternalizedCheckpointSettings(),
-					triggerVertices,
-					ackVertices,
-					confirmVertices,
-					hooks,
-					checkpointIdCounter,
-					completedCheckpoints,
-					externalizedCheckpointsDir,
-					metadataBackend,
-					checkpointStatsTracker);
+				chkConfig.getCheckpointInterval(),
+				chkConfig.getCheckpointTimeout(),
+				chkConfig.getMinPauseBetweenCheckpoints(),
+				chkConfig.getMaxConcurrentCheckpoints(),
+				chkConfig.getExternalizedCheckpointSettings(),
+				triggerVertices,
+				ackVertices,
+				confirmVertices,
+				hooks,
+				checkpointIdCounter,
+				completedCheckpoints,
+				externalizedCheckpointsDir,
+				metadataBackend,
+				checkpointStatsTracker);
 		}
 
 		// create all the metrics for the Execution Graph

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
new file mode 100644
index 0000000..e00a6d4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Configuration settings for the {@link CheckpointCoordinator}. This includes the checkpoint
+ * interval, the checkpoint timeout, the pause between checkpoints, the maximum number of
+ * concurrent checkpoints and settings for externalized checkpoints.
+ */
+public class CheckpointCoordinatorConfiguration implements Serializable {
+
+	private static final long serialVersionUID = -647384516034982626L;
+
+	private final long checkpointInterval;
+
+	private final long checkpointTimeout;
+
+	private final long minPauseBetweenCheckpoints;
+
+	private final int maxConcurrentCheckpoints;
+
+	/** Settings for externalized checkpoints. */
+	private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
+
+	/**
+	 * Flag indicating whether exactly once checkpoint mode has been configured.
+	 * If <code>false</code>, at least once mode has been configured. This is
+	 * not a necessary attribute, because the checkpointing mode is only relevant
+	 * for the stream tasks, but we expose it here to forward it to the web runtime
+	 * UI.
+	 */
+	private final boolean isExactlyOnce;
+
+	public CheckpointCoordinatorConfiguration(
+			long checkpointInterval,
+			long checkpointTimeout,
+			long minPauseBetweenCheckpoints,
+			int maxConcurrentCheckpoints,
+			ExternalizedCheckpointSettings externalizedCheckpointSettings,
+			boolean isExactlyOnce) {
+
+		// sanity checks
+		if (checkpointInterval < 1 || checkpointTimeout < 1 ||
+			minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) {
+			throw new IllegalArgumentException();
+		}
+
+		this.checkpointInterval = checkpointInterval;
+		this.checkpointTimeout = checkpointTimeout;
+		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+		this.externalizedCheckpointSettings = Preconditions.checkNotNull(externalizedCheckpointSettings);
+		this.isExactlyOnce = isExactlyOnce;
+	}
+
+	public long getCheckpointInterval() {
+		return checkpointInterval;
+	}
+
+	public long getCheckpointTimeout() {
+		return checkpointTimeout;
+	}
+
+	public long getMinPauseBetweenCheckpoints() {
+		return minPauseBetweenCheckpoints;
+	}
+
+	public int getMaxConcurrentCheckpoints() {
+		return maxConcurrentCheckpoints;
+	}
+
+	public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() {
+		return externalizedCheckpointSettings;
+	}
+
+	public boolean isExactlyOnce() {
+		return isExactlyOnce;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		CheckpointCoordinatorConfiguration that = (CheckpointCoordinatorConfiguration) o;
+		return checkpointInterval == that.checkpointInterval &&
+			checkpointTimeout == that.checkpointTimeout &&
+			minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
+			maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
+			isExactlyOnce == that.isExactlyOnce &&
+			Objects.equals(externalizedCheckpointSettings, that.externalizedCheckpointSettings);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointSettings, isExactlyOnce);
+	}
+
+	@Override
+	public String toString() {
+		return "JobCheckpointingConfiguration{" +
+			"checkpointInterval=" + checkpointInterval +
+			", checkpointTimeout=" + checkpointTimeout +
+			", minPauseBetweenCheckpoints=" + minPauseBetweenCheckpoints +
+			", maxConcurrentCheckpoints=" + maxConcurrentCheckpoints +
+			", externalizedCheckpointSettings=" + externalizedCheckpointSettings +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
index b3b487c..f432796 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.annotation.Internal;
 
+import java.util.Objects;
+
 /**
  * Grouped settings for externalized checkpoints.
  */
@@ -67,4 +69,21 @@ public class ExternalizedCheckpointSettings implements java.io.Serializable {
 		return NONE;
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		ExternalizedCheckpointSettings that = (ExternalizedCheckpointSettings) o;
+		return externalizeCheckpoints == that.externalizeCheckpoints &&
+			deleteOnCancellation == that.deleteOnCancellation;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(externalizeCheckpoints, deleteOnCancellation);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
index cc97e1b..d14e75e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
@@ -21,9 +21,12 @@ package org.apache.flink.runtime.jobgraph.tasks;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
+
+import java.io.Serializable;
 import java.util.List;
 
 import static java.util.Objects.requireNonNull;
@@ -33,7 +36,7 @@ import static java.util.Objects.requireNonNull;
  * for the asynchronous checkpoints of the JobGraph, such as interval, and which vertices
  * need to participate.
  */
-public class JobCheckpointingSettings implements java.io.Serializable {
+public class JobCheckpointingSettings implements Serializable {
 
 	private static final long serialVersionUID = -2593319571078198180L;
 
@@ -43,16 +46,8 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 
 	private final List<JobVertexID> verticesToConfirm;
 
-	private final long checkpointInterval;
-
-	private final long checkpointTimeout;
-
-	private final long minPauseBetweenCheckpoints;
-
-	private final int maxConcurrentCheckpoints;
-
-	/** Settings for externalized checkpoints. */
-	private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
+	/** Contains configuration settings for the CheckpointCoordinator */
+	private final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration;
 
 	/** The default state backend, if configured by the user in the job */
 	@Nullable
@@ -62,61 +57,36 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 	@Nullable
 	private final SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks;
 
-	/**
-	 * Flag indicating whether exactly once checkpoint mode has been configured.
-	 * If <code>false</code>, at least once mode has been configured. This is
-	 * not a necessary attribute, because the checkpointing mode is only relevant
-	 * for the stream tasks, but we expose it here to forward it to the web runtime
-	 * UI.
-	 */
-	private final boolean isExactlyOnce;
-
 	public JobCheckpointingSettings(
 			List<JobVertexID> verticesToTrigger,
 			List<JobVertexID> verticesToAcknowledge,
 			List<JobVertexID> verticesToConfirm,
-			long checkpointInterval,
-			long checkpointTimeout,
-			long minPauseBetweenCheckpoints,
-			int maxConcurrentCheckpoints,
-			ExternalizedCheckpointSettings externalizedCheckpointSettings,
-			@Nullable SerializedValue<StateBackend> defaultStateBackend,
-			boolean isExactlyOnce) {
-
-		this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm,
-				checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints,
-				externalizedCheckpointSettings, defaultStateBackend, null, isExactlyOnce);
+			CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration,
+			@Nullable SerializedValue<StateBackend> defaultStateBackend) {
+
+		this(
+			verticesToTrigger,
+			verticesToAcknowledge,
+			verticesToConfirm,
+			checkpointCoordinatorConfiguration,
+			defaultStateBackend,
+			null);
 	}
 
 	public JobCheckpointingSettings(
 			List<JobVertexID> verticesToTrigger,
 			List<JobVertexID> verticesToAcknowledge,
 			List<JobVertexID> verticesToConfirm,
-			long checkpointInterval,
-			long checkpointTimeout,
-			long minPauseBetweenCheckpoints,
-			int maxConcurrentCheckpoints,
-			ExternalizedCheckpointSettings externalizedCheckpointSettings,
+			CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration,
 			@Nullable SerializedValue<StateBackend> defaultStateBackend,
-			@Nullable SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks,
-			boolean isExactlyOnce) {
+			@Nullable SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks) {
 
-		// sanity checks
-		if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-				minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) {
-			throw new IllegalArgumentException();
-		}
 
 		this.verticesToTrigger = requireNonNull(verticesToTrigger);
 		this.verticesToAcknowledge = requireNonNull(verticesToAcknowledge);
 		this.verticesToConfirm = requireNonNull(verticesToConfirm);
-		this.checkpointInterval = checkpointInterval;
-		this.checkpointTimeout = checkpointTimeout;
-		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
-		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
-		this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
+		this.checkpointCoordinatorConfiguration = Preconditions.checkNotNull(checkpointCoordinatorConfiguration);
 		this.defaultStateBackend = defaultStateBackend;
-		this.isExactlyOnce = isExactlyOnce;
 		this.masterHooks = masterHooks;
 	}
 
@@ -134,24 +104,8 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 		return verticesToConfirm;
 	}
 
-	public long getCheckpointInterval() {
-		return checkpointInterval;
-	}
-
-	public long getCheckpointTimeout() {
-		return checkpointTimeout;
-	}
-
-	public long getMinPauseBetweenCheckpoints() {
-		return minPauseBetweenCheckpoints;
-	}
-
-	public int getMaxConcurrentCheckpoints() {
-		return maxConcurrentCheckpoints;
-	}
-
-	public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() {
-		return externalizedCheckpointSettings;
+	public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() {
+		return checkpointCoordinatorConfiguration;
 	}
 
 	@Nullable
@@ -164,18 +118,14 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 		return masterHooks;
 	}
 
-	public boolean isExactlyOnce() {
-		return isExactlyOnce;
-	}
-
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public String toString() {
-		return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " +
-						"maxConcurrent=%d, trigger=%s, ack=%s, commit=%s",
-						checkpointInterval, checkpointTimeout,
-						minPauseBetweenCheckpoints, maxConcurrentCheckpoints,
-						verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
+		return String.format("SnapshotSettings: config=%s, trigger=%s, ack=%s, commit=%s",
+			checkpointCoordinatorConfiguration,
+			verticesToTrigger,
+			verticesToAcknowledge,
+			verticesToConfirm);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
index da4f20b..2750c33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
@@ -23,9 +23,9 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
 import org.apache.flink.runtime.rest.NotFoundException;
@@ -158,9 +158,9 @@ public class JobCancellationWithSavepointHandlers {
 							() -> new CompletionException(
 								new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
 
-						CheckpointCoordinator coord = graph.getCheckpointCoordinator();
-						if (coord == null) {
-							throw new CompletionException(new FlinkException("Cannot find CheckpointCoordinator for job."));
+						CheckpointCoordinatorConfiguration jobCheckpointingConfiguration = graph.getCheckpointCoordinatorConfiguration();
+						if (jobCheckpointingConfiguration == null) {
+							throw new CompletionException(new FlinkException("Cannot find checkpoint coordinator configuration for job."));
 						}
 
 						String targetDirectory = pathParams.get("targetDirectory");
@@ -176,7 +176,7 @@ public class JobCancellationWithSavepointHandlers {
 						}
 
 						try {
-							return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
+							return handleNewRequest(jobManagerGateway, jobId, targetDirectory, jobCheckpointingConfiguration.getCheckpointTimeout());
 						} catch (IOException e) {
 							throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", e));
 						}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index 27a64ed..6ab6676 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
@@ -85,21 +85,21 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 	private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-		JobCheckpointingSettings settings = graph.getJobCheckpointingSettings();
+		CheckpointCoordinatorConfiguration jobCheckpointingConfiguration = graph.getCheckpointCoordinatorConfiguration();
 
-		if (settings == null) {
+		if (jobCheckpointingConfiguration == null) {
 			return "{}";
 		}
 
 		gen.writeStartObject();
 		{
-			gen.writeStringField("mode", settings.isExactlyOnce() ? "exactly_once" : "at_least_once");
-			gen.writeNumberField("interval", settings.getCheckpointInterval());
-			gen.writeNumberField("timeout", settings.getCheckpointTimeout());
-			gen.writeNumberField("min_pause", settings.getMinPauseBetweenCheckpoints());
-			gen.writeNumberField("max_concurrent", settings.getMaxConcurrentCheckpoints());
+			gen.writeStringField("mode", jobCheckpointingConfiguration.isExactlyOnce() ? "exactly_once" : "at_least_once");
+			gen.writeNumberField("interval", jobCheckpointingConfiguration.getCheckpointInterval());
+			gen.writeNumberField("timeout", jobCheckpointingConfiguration.getCheckpointTimeout());
+			gen.writeNumberField("min_pause", jobCheckpointingConfiguration.getMinPauseBetweenCheckpoints());
+			gen.writeNumberField("max_concurrent", jobCheckpointingConfiguration.getMaxConcurrentCheckpoints());
 
-			ExternalizedCheckpointSettings externalization = settings.getExternalizedCheckpointSettings();
+			ExternalizedCheckpointSettings externalization = jobCheckpointingConfiguration.getExternalizedCheckpointSettings();
 			gen.writeObjectFieldStart("externalization");
 			{
 				if (externalization.externalizeCheckpoints()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index f597757..84b5774 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -76,14 +77,15 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 				Collections.<JobVertexID>emptyList(),
 				Collections.<JobVertexID>emptyList(),
 				Collections.<JobVertexID>emptyList(),
-				1000L,
-				10000L,
-				0L,
-				1,
-				ExternalizedCheckpointSettings.none(),
+				new CheckpointCoordinatorConfiguration(
+					1000L,
+					10000L,
+					0L,
+					1,
+					ExternalizedCheckpointSettings.none(),
+					true),
 				new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
-				serHooks,
-				true);
+				serHooks);
 
 		final JobGraph jobGraph = new JobGraph(new JobID(), "test job");
 		jobGraph.setSnapshotSettings(checkpointingSettings);
@@ -134,6 +136,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 
 	private static final class CustomStateBackend implements StateBackend {
 
+		private static final long serialVersionUID = -6107964383429395816L;
 		/**
 		 * Simulate a custom option that is not in the normal classpath.
 		 */

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 10e897a..a47b497 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.junit.Test;
 
@@ -62,21 +63,23 @@ public class CheckpointStatsTrackerTest {
 			Collections.singletonList(new JobVertexID()),
 			Collections.singletonList(new JobVertexID()),
 			Collections.singletonList(new JobVertexID()),
-			181238123L,
-			19191992L,
-			191929L,
-			123,
-			ExternalizedCheckpointSettings.none(),
-			null,
-			false);
+			new CheckpointCoordinatorConfiguration(
+				181238123L,
+				19191992L,
+				191929L,
+				123,
+				ExternalizedCheckpointSettings.none(),
+				false
+			),
+			null);
 
 		CheckpointStatsTracker tracker = new CheckpointStatsTracker(
 			0,
 			Collections.singletonList(jobVertex),
-			snapshottingSettings,
+			snapshottingSettings.getCheckpointCoordinatorConfiguration(),
 			new UnregisteredMetricsGroup());
 
-		assertEquals(snapshottingSettings, tracker.getSnapshottingSettings());
+		assertEquals(snapshottingSettings.getCheckpointCoordinatorConfiguration(), tracker.getJobCheckpointingConfiguration());
 	}
 
 	/**
@@ -94,7 +97,7 @@ public class CheckpointStatsTrackerTest {
 		CheckpointStatsTracker tracker = new CheckpointStatsTracker(
 			0,
 			Collections.singletonList(jobVertex),
-			mock(JobCheckpointingSettings.class),
+			mock(CheckpointCoordinatorConfiguration.class),
 			new UnregisteredMetricsGroup());
 
 		PendingCheckpointStats pending = tracker.reportPendingCheckpoint(
@@ -142,7 +145,7 @@ public class CheckpointStatsTrackerTest {
 		CheckpointStatsTracker tracker = new CheckpointStatsTracker(
 			10,
 			Collections.singletonList(jobVertex),
-			mock(JobCheckpointingSettings.class),
+			mock(CheckpointCoordinatorConfiguration.class),
 			new UnregisteredMetricsGroup());
 
 		// Completed checkpoint
@@ -247,7 +250,7 @@ public class CheckpointStatsTrackerTest {
 		CheckpointStatsTracker tracker = new CheckpointStatsTracker(
 			10,
 			Collections.singletonList(jobVertex),
-			mock(JobCheckpointingSettings.class),
+			mock(CheckpointCoordinatorConfiguration.class),
 			new UnregisteredMetricsGroup());
 
 		CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
@@ -293,7 +296,7 @@ public class CheckpointStatsTrackerTest {
 		new CheckpointStatsTracker(
 			0,
 			Collections.singletonList(jobVertex),
-			mock(JobCheckpointingSettings.class),
+			mock(CheckpointCoordinatorConfiguration.class),
 			metricGroup);
 
 		verify(metricGroup, times(1)).gauge(eq(CheckpointStatsTracker.NUMBER_OF_CHECKPOINTS_METRIC), any(Gauge.class));
@@ -409,7 +412,7 @@ public class CheckpointStatsTrackerTest {
 		CheckpointStatsTracker stats = new CheckpointStatsTracker(
 			0,
 			Collections.singletonList(jobVertex),
-			mock(JobCheckpointingSettings.class),
+			mock(CheckpointCoordinatorConfiguration.class),
 			metricGroup);
 
 		// Make sure to adjust this test if metrics are added/removed
@@ -534,7 +537,7 @@ public class CheckpointStatsTrackerTest {
 		return new CheckpointStatsTracker(
 			0,
 			Collections.singletonList(jobVertex),
-			mock(JobCheckpointingSettings.class),
+			mock(CheckpointCoordinatorConfiguration.class),
 			new UnregisteredMetricsGroup());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index c58e3a0..c4676e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -67,8 +68,19 @@ public class CoordinatorShutdownTest extends TestLogger {
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 			
 			JobGraph testGraph = new JobGraph("test job", vertex);
-			testGraph.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList, 
-					5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true));
+			testGraph.setSnapshotSettings(
+				new JobCheckpointingSettings(
+					vertexIdList,
+					vertexIdList,
+					vertexIdList,
+					new CheckpointCoordinatorConfiguration(
+						5000,
+						60000,
+						0L,
+						Integer.MAX_VALUE,
+						ExternalizedCheckpointSettings.none(),
+						true),
+					null));
 			
 			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
@@ -126,8 +138,19 @@ public class CoordinatorShutdownTest extends TestLogger {
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 
 			JobGraph testGraph = new JobGraph("test job", vertex);
-			testGraph.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList,
-					5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true));
+			testGraph.setSnapshotSettings(
+				new JobCheckpointingSettings(
+					vertexIdList,
+					vertexIdList,
+					vertexIdList,
+					new CheckpointCoordinatorConfiguration(
+						5000,
+						60000,
+						0L,
+						Integer.MAX_VALUE,
+						ExternalizedCheckpointSettings.none(),
+						true),
+					null));
 			
 			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index a524e5c..e083740 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -104,10 +103,10 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				100,
 				1,
 				ExternalizedCheckpointSettings.none(),
-				Collections.<ExecutionJobVertex>emptyList(),
-				Collections.<ExecutionJobVertex>emptyList(),
-				Collections.<ExecutionJobVertex>emptyList(),
-				Collections.<MasterTriggerRestoreHook<?>>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				counter,
 				store,
 				null,

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 5ffc4ae..81d5df9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -42,9 +42,10 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -117,7 +118,7 @@ public class ArchivedExecutionGraphTest {
 		CheckpointStatsTracker statsTracker = new CheckpointStatsTracker(
 				0,
 				jobVertices,
-				mock(JobCheckpointingSettings.class),
+				mock(CheckpointCoordinatorConfiguration.class),
 				new UnregisteredMetricsGroup());
 
 		runtimeGraph.enableCheckpointing(

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index a739e14..bbc232d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -59,6 +59,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
@@ -509,17 +510,19 @@ public class ExecutionGraphDeploymentTest {
 
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test");
-		jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			100,
-			10 * 60 * 1000,
-			0,
-			1,
-			ExternalizedCheckpointSettings.none(),
-			null,
-			false));
+		jobGraph.setSnapshotSettings(
+			new JobCheckpointingSettings(
+				Collections.<JobVertexID>emptyList(),
+				Collections.<JobVertexID>emptyList(),
+				Collections.<JobVertexID>emptyList(),
+				new CheckpointCoordinatorConfiguration(
+					100,
+					10 * 60 * 1000,
+					0,
+					1,
+					ExternalizedCheckpointSettings.none(),
+					false),
+				null));
 
 		return ExecutionGraphBuilder.buildGraph(
 			null,

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index 097c296..721b8f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -42,25 +42,20 @@ public class JobCheckpointingSettingsTest {
 			Arrays.asList(new JobVertexID(), new JobVertexID()),
 			Arrays.asList(new JobVertexID(), new JobVertexID()),
 			Arrays.asList(new JobVertexID(), new JobVertexID()),
-			1231231,
-			1231,
-			112,
-			12,
-			ExternalizedCheckpointSettings.externalizeCheckpoints(true),
-			new SerializedValue<StateBackend>(new MemoryStateBackend()),
-			false);
+			new CheckpointCoordinatorConfiguration(
+				1231231,
+				1231,
+				112,
+				12,
+				ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+				false),
+			new SerializedValue<>(new MemoryStateBackend()));
 
 		JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
 		assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge());
 		assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm());
 		assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger());
-		assertEquals(settings.getCheckpointInterval(), copy.getCheckpointInterval());
-		assertEquals(settings.getCheckpointTimeout(), copy.getCheckpointTimeout());
-		assertEquals(settings.getMinPauseBetweenCheckpoints(), copy.getMinPauseBetweenCheckpoints());
-		assertEquals(settings.getMaxConcurrentCheckpoints(), copy.getMaxConcurrentCheckpoints());
-		assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
-		assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
-		assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
+		assertEquals(settings.getCheckpointCoordinatorConfiguration(), copy.getCheckpointCoordinatorConfiguration());
 		assertNotNull(copy.getDefaultStateBackend());
 		assertTrue(copy.getDefaultStateBackend().deserializeValue(this.getClass().getClassLoader()).getClass() == MemoryStateBackend.class);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index d0af88d..62f070b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -237,13 +238,14 @@ public class JobManagerHARecoveryTest extends TestLogger {
 					vertexId,
 					vertexId,
 					vertexId,
-					100L,
-					10L * 60L * 1000L,
-					0L,
-					1,
-					ExternalizedCheckpointSettings.none(),
-					null,
-					true));
+					new CheckpointCoordinatorConfiguration(
+						100L,
+						10L * 60L * 1000L,
+						0L,
+						1,
+						ExternalizedCheckpointSettings.none(),
+						true),
+					null));
 
 			BlockingStatefulInvokable.initializeStaticHelpers(slots);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 8b88e11..a6b0581 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -59,6 +59,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -865,16 +866,17 @@ public class JobManagerTest extends TestLogger {
 			JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
 
 			JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
-					Collections.singletonList(sourceVertex.getID()),
-					Collections.singletonList(sourceVertex.getID()),
-					Collections.singletonList(sourceVertex.getID()),
+				Collections.singletonList(sourceVertex.getID()),
+				Collections.singletonList(sourceVertex.getID()),
+				Collections.singletonList(sourceVertex.getID()),
+				new CheckpointCoordinatorConfiguration(
 					3600000,
 					3600000,
 					0,
 					Integer.MAX_VALUE,
 					ExternalizedCheckpointSettings.none(),
-					null,
-					true);
+					true),
+					null);
 
 			jobGraph.setSnapshotSettings(snapshottingSettings);
 
@@ -1004,13 +1006,14 @@ public class JobManagerTest extends TestLogger {
 				Collections.singletonList(sourceVertex.getID()),
 				Collections.singletonList(sourceVertex.getID()),
 				Collections.singletonList(sourceVertex.getID()),
-				3600000,
-				3600000,
-				0,
-				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
-				null,
-				true);
+				new CheckpointCoordinatorConfiguration(
+					3600000,
+					3600000,
+					0,
+					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none(),
+					true),
+				null);
 
 			jobGraph.setSnapshotSettings(snapshottingSettings);
 
@@ -1116,13 +1119,14 @@ public class JobManagerTest extends TestLogger {
 					Collections.singletonList(sourceVertex.getID()),
 					Collections.singletonList(sourceVertex.getID()),
 					Collections.singletonList(sourceVertex.getID()),
-					Long.MAX_VALUE, // deactivated checkpointing
-					360000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					null,
-					true);
+					new CheckpointCoordinatorConfiguration(
+						Long.MAX_VALUE, // deactivated checkpointing
+						360000,
+						0,
+						Integer.MAX_VALUE,
+						ExternalizedCheckpointSettings.none(),
+						true),
+					null);
 
 			jobGraph.setSnapshotSettings(snapshottingSettings);
 
@@ -1229,13 +1233,14 @@ public class JobManagerTest extends TestLogger {
 					Collections.singletonList(sourceVertex.getID()),
 					Collections.singletonList(sourceVertex.getID()),
 					Collections.singletonList(sourceVertex.getID()),
-					Long.MAX_VALUE, // deactivated checkpointing
-					360000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					null,
-					true);
+					new CheckpointCoordinatorConfiguration(
+						Long.MAX_VALUE, // deactivated checkpointing
+						360000,
+						0,
+						Integer.MAX_VALUE,
+						ExternalizedCheckpointSettings.none(),
+						true),
+					null);
 
 			jobGraph.setSnapshotSettings(snapshottingSettings);
 
@@ -1276,13 +1281,14 @@ public class JobManagerTest extends TestLogger {
 					Collections.singletonList(newSourceVertex.getID()),
 					Collections.singletonList(newSourceVertex.getID()),
 					Collections.singletonList(newSourceVertex.getID()),
-					Long.MAX_VALUE, // deactivated checkpointing
-					360000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					null,
-					true);
+					new CheckpointCoordinatorConfiguration(
+						Long.MAX_VALUE, // deactivated checkpointing
+						360000,
+						0,
+						Integer.MAX_VALUE,
+						ExternalizedCheckpointSettings.none(),
+						true),
+					null);
 
 			newJobGraph.setSnapshotSettings(newSnapshottingSettings);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index c79d3ce..ee65951 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -244,8 +245,19 @@ public class JobSubmitTest {
 		List<JobVertexID> vertexIdList = Collections.singletonList(jobVertex.getID());
 
 		JobGraph jg = new JobGraph("test job", jobVertex);
-		jg.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList,
-			5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), null, true));
+		jg.setSnapshotSettings(
+			new JobCheckpointingSettings(
+				vertexIdList,
+				vertexIdList,
+				vertexIdList,
+				new CheckpointCoordinatorConfiguration(
+					5000,
+					5000,
+					0L,
+					10,
+					ExternalizedCheckpointSettings.none(),
+					true),
+				null));
 		return jg;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
index 8f21af0..8bb1141 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
@@ -21,10 +21,11 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.TestLogger;
 
@@ -90,10 +91,15 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		JobID jobId = new JobID();
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
-		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
 		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-		when(graph.getCheckpointCoordinator()).thenReturn(coord);
-		when(coord.getCheckpointTimeout()).thenReturn(timeout);
+		when(graph.getCheckpointCoordinatorConfiguration()).thenReturn(
+			new CheckpointCoordinatorConfiguration(
+				1L,
+				timeout,
+				1L,
+				1,
+				ExternalizedCheckpointSettings.none(),
+				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
 		JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
@@ -119,10 +125,15 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		JobID jobId = new JobID();
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
-		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
 		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-		when(graph.getCheckpointCoordinator()).thenReturn(coord);
-		when(coord.getCheckpointTimeout()).thenReturn(timeout);
+		when(graph.getCheckpointCoordinatorConfiguration()).thenReturn(
+			new CheckpointCoordinatorConfiguration(
+				1L,
+				timeout,
+				1L,
+				1,
+				ExternalizedCheckpointSettings.none(),
+				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
 		JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
@@ -167,9 +178,15 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		JobID jobId = new JobID();
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
-		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
 		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-		when(graph.getCheckpointCoordinator()).thenReturn(coord);
+		when(graph.getCheckpointCoordinatorConfiguration()).thenReturn(
+			new CheckpointCoordinatorConfiguration(
+				1L,
+				1L,
+				1L,
+				1,
+				ExternalizedCheckpointSettings.none(),
+				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
 		JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
@@ -293,9 +310,15 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		JobID jobId = new JobID();
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
-		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
 		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-		when(graph.getCheckpointCoordinator()).thenReturn(coord);
+		when(graph.getCheckpointCoordinatorConfiguration()).thenReturn(
+			new CheckpointCoordinatorConfiguration(
+				1L,
+				1L,
+				1L,
+				1,
+				ExternalizedCheckpointSettings.none(),
+				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
 		JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
index db91f58..7050fa6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
@@ -21,9 +21,8 @@ package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -54,7 +53,7 @@ public class CheckpointConfigHandlerTest {
 
 		AccessExecutionGraph graph = graphAndSettings.graph;
 		when(graph.getJobID()).thenReturn(new JobID());
-		JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings;
+		CheckpointCoordinatorConfiguration chkConfig = graphAndSettings.jobCheckpointingConfiguration;
 		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
 
 		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
@@ -66,10 +65,10 @@ public class CheckpointConfigHandlerTest {
 		JsonNode rootNode = mapper.readTree(archive.getJson());
 
 		Assert.assertEquals("exactly_once", rootNode.get("mode").asText());
-		Assert.assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong());
-		Assert.assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong());
-		Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
-		Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
+		Assert.assertEquals(chkConfig.getCheckpointInterval(), rootNode.get("interval").asLong());
+		Assert.assertEquals(chkConfig.getCheckpointTimeout(), rootNode.get("timeout").asLong());
+		Assert.assertEquals(chkConfig.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
+		Assert.assertEquals(chkConfig.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
 
 		JsonNode externalizedNode = rootNode.get("externalization");
 		Assert.assertNotNull(externalizedNode);
@@ -94,7 +93,7 @@ public class CheckpointConfigHandlerTest {
 		GraphAndSettings graphAndSettings = createGraphAndSettings(false, true);
 
 		AccessExecutionGraph graph = graphAndSettings.graph;
-		JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings;
+		CheckpointCoordinatorConfiguration chkConfig = graphAndSettings.jobCheckpointingConfiguration;
 
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
@@ -103,10 +102,10 @@ public class CheckpointConfigHandlerTest {
 		JsonNode rootNode = mapper.readTree(json);
 
 		assertEquals("exactly_once", rootNode.get("mode").asText());
-		assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong());
-		assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong());
-		assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
-		assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
+		assertEquals(chkConfig.getCheckpointInterval(), rootNode.get("interval").asLong());
+		assertEquals(chkConfig.getCheckpointTimeout(), rootNode.get("timeout").asLong());
+		assertEquals(chkConfig.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
+		assertEquals(chkConfig.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
 
 		JsonNode externalizedNode = rootNode.get("externalization");
 		assertNotNull(externalizedNode);
@@ -160,35 +159,31 @@ public class CheckpointConfigHandlerTest {
 			? ExternalizedCheckpointSettings.externalizeCheckpoints(true)
 			: ExternalizedCheckpointSettings.none();
 
-		JobCheckpointingSettings settings = new JobCheckpointingSettings(
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
+		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
 			interval,
 			timeout,
 			minPause,
 			maxConcurrent,
 			externalizedSetting,
-			null,
 			exactlyOnce);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		when(graph.getJobCheckpointingSettings()).thenReturn(settings);
+		when(graph.getCheckpointCoordinatorConfiguration()).thenReturn(chkConfig);
 
-		return new GraphAndSettings(graph, settings, externalizedSetting);
+		return new GraphAndSettings(graph, chkConfig, externalizedSetting);
 	}
 
 	private static class GraphAndSettings {
 		public final AccessExecutionGraph graph;
-		public final JobCheckpointingSettings snapshottingSettings;
+		public final CheckpointCoordinatorConfiguration jobCheckpointingConfiguration;
 		public final ExternalizedCheckpointSettings externalizedSettings;
 
 		public GraphAndSettings(
 				AccessExecutionGraph graph,
-				JobCheckpointingSettings snapshottingSettings,
+				CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
 				ExternalizedCheckpointSettings externalizedSettings) {
 			this.graph = graph;
-			this.snapshottingSettings = snapshottingSettings;
+			this.jobCheckpointingConfiguration = jobCheckpointingConfiguration;
 			this.externalizedSettings = externalizedSettings;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 7ba1633..98f136a 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index e209608..61e83be 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedCheckpoint}
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
-import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobCheckpointingSettings}
+import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, CheckpointCoordinatorConfiguration, JobCheckpointingSettings}
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode}
 import org.apache.flink.runtime.jobmanager.Tasks._
 import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
@@ -837,13 +837,14 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000,
-            60000,
-            60000,
-            1,
-            ExternalizedCheckpointSettings.none,
-            null,
-            true))
+            new CheckpointCoordinatorConfiguration(
+              60000,
+              60000,
+              60000,
+              1,
+              ExternalizedCheckpointSettings.none,
+              true),
+            null))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
@@ -897,13 +898,14 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000,
-            60000,
-            60000,
-            1,
-            ExternalizedCheckpointSettings.none,
-            null,
-            true))
+            new CheckpointCoordinatorConfiguration(
+              60000,
+              60000,
+              60000,
+              1,
+              ExternalizedCheckpointSettings.none,
+              true),
+            null))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
@@ -965,13 +967,14 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000,
-            60000,
-            60000,
-            1,
-            ExternalizedCheckpointSettings.none,
-            null,
-            true))
+            new CheckpointCoordinatorConfiguration(
+              60000,
+              60000,
+              60000,
+              1,
+              ExternalizedCheckpointSettings.none,
+              true),
+            null))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 884b899..0364223 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
@@ -682,13 +683,18 @@ public class StreamingJobGraphGenerator {
 		//  --- done, put it all together ---
 
 		JobCheckpointingSettings settings = new JobCheckpointingSettings(
-				triggerVertices, ackVertices, commitVertices, interval,
-				cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
+			triggerVertices,
+			ackVertices,
+			commitVertices,
+			new CheckpointCoordinatorConfiguration(
+				interval,
+				cfg.getCheckpointTimeout(),
+				cfg.getMinPauseBetweenCheckpoints(),
 				cfg.getMaxConcurrentCheckpoints(),
 				externalizedCheckpointSettings,
-				serializedStateBackend,
-				serializedHooks,
-				isExactlyOnce);
+				isExactlyOnce),
+			serializedStateBackend,
+			serializedHooks);
 
 		jobGraph.setSnapshotSettings(settings);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dd557fa/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 6dd7de7..4e94e05 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -117,7 +117,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
 		JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
-		assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval());
+		assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval());
 	}
 
 	/**