You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/24 16:09:38 UTC

[5/6] flink git commit: [FLINK-6046] Either store serialized value or permanent blob key in ExecutionGraph and ExecutionJobVertex

[FLINK-6046] Either store serialized value or permanent blob key in ExecutionGraph and ExecutionJobVertex


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ff07e63
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ff07e63
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ff07e63

Branch: refs/heads/master
Commit: 5ff07e63d1e9a98959e5edf66872222b847d23d5
Parents: 315badc
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 20 23:39:14 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 24 18:08:31 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/BlobServerOptions.java  |   1 +
 .../flink/configuration/JobManagerOptions.java  |   7 -
 .../apache/flink/runtime/blob/BlobServer.java   |  39 ++++-
 .../runtime/executiongraph/ExecutionGraph.java  | 151 +++++++------------
 .../executiongraph/ExecutionGraphBuilder.java   |  41 ++---
 .../executiongraph/ExecutionJobVertex.java      | 101 ++++---------
 .../runtime/executiongraph/ExecutionVertex.java |  51 ++++---
 .../ExecutionGraphDeploymentTest.java           |  49 +++---
 ...ecutionGraphDeploymentWithBlobCacheTest.java |   4 +-
 ...cutionGraphDeploymentWithBlobServerTest.java |  21 +--
 .../executiongraph/FailoverRegionTest.java      |  16 +-
 .../executiongraph/GlobalModVersionTest.java    |   4 +-
 .../IndividualRestartsConcurrencyTest.java      |   4 +-
 .../PipelinedRegionFailoverConcurrencyTest.java |   8 +-
 .../RestartPipelinedRegionStrategyTest.java     |  32 ++--
 .../partitioner/RescalePartitionerTest.java     |   9 +-
 16 files changed, 253 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
index 20b7303..8680096 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index a39927d..ef3306e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -96,13 +96,6 @@ public class JobManagerOptions {
 		key("jobmanager.archive.fs.dir")
 			.noDefaultValue();
 
-	/**
-	 * The maximum size of the <tt>TaskDeploymentDescriptor</tt>'s serialized task and job
-	 * information to still transmit them via RPC. Larger blobs may be offloaded to the BLOB server.
-	 */
-	public static final ConfigOption<Integer> TDD_OFFLOAD_MINSIZE = key("jobmanager.tdd.offload.minsize")
-		.defaultValue(1_024); // 1KiB by default
-
 	// ---------------------------------------------------------------------------------------------
 
 	private JobManagerOptions() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 7249c8b..0f6b350 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -24,9 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -884,8 +886,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 *
 	 * @return configuration
 	 */
-	public final Configuration getConfiguration() {
-		return blobServiceConfiguration;
+	public final int getMinOffloadingSize() {
+		return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
 	}
 
 	/**
@@ -941,4 +943,37 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 			return new ArrayList<>(activeConnections);
 		}
 	}
+
+	/**
+	 * Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
+	 * offloading size of the BlobServer.
+	 *
+	 * @param value to serialize
+	 * @param jobId to which the value belongs.
+	 * @param blobServer
+	 * @param <T>
+	 * @return
+	 * @throws IOException
+	 */
+	public static <T> Either<SerializedValue<T>, PermanentBlobKey> tryOffload(
+		T value,
+		JobID jobId,
+		@Nullable BlobServer blobServer) throws IOException {
+
+		final SerializedValue<T> serializedValue = new SerializedValue<>(value);
+
+		if (blobServer == null || serializedValue.getByteArray().length < blobServer.getMinOffloadingSize()) {
+			return Either.Left(new SerializedValue<>(value));
+		} else {
+			try {
+				final PermanentBlobKey permanentBlobKey = blobServer.putPermanent(jobId, serializedValue.getByteArray());
+
+				return Either.Right(permanentBlobKey);
+			} catch (IOException e) {
+				LOG.warn("Failed to offload value " + value + " for job " + jobId + " to BLOB store.", e);
+
+				return Either.Left(serializedValue);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index fe7770b..74a68ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
@@ -58,17 +57,17 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
@@ -78,6 +77,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -174,17 +174,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Job specific information like the job id, job name, job configuration, etc. */
 	private final JobInformation jobInformation;
 
-	/** Serialized version of the job specific information. This is done to avoid multiple
-	 * serializations of the same data when creating a TaskDeploymentDescriptor.
-	 */
-	private final SerializedValue<JobInformation> serializedJobInformation;
-
-	/**
-	 * The key of the offloaded job information BLOB containing {@link #serializedJobInformation} or
-	 * <tt>null</tt> if not offloaded.
-	 */
-	@Nullable
-	private final PermanentBlobKey jobInformationBlobKey;
+	private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
 
 	/** The executor which is used to execute futures. */
 	private final ScheduledExecutorService futureExecutor;
@@ -245,10 +235,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** The total number of vertices currently in the execution graph */
 	private int numVerticesTotal;
 
-	/** Blob server reference for offloading large RPC messages. */
-	@Nullable
-	private final BlobServer blobServer;
-
 	// ------ Configuration of the Execution -------
 
 	/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -290,6 +276,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
 
+	@Nullable
+	private BlobServer blobServer;
+
 	// --------------------------------------------------------------------------------------------
 	//   Constructors
 	// --------------------------------------------------------------------------------------------
@@ -307,10 +296,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			SerializedValue<ExecutionConfig> serializedConfig,
 			Time timeout,
 			RestartStrategy restartStrategy,
-			SlotProvider slotProvider) {
+			SlotProvider slotProvider) throws IOException {
 
-		this(futureExecutor, ioExecutor, jobId, jobName, jobConfig, serializedConfig, timeout,
-			restartStrategy, slotProvider, null);
+		this(
+			new JobInformation(
+				jobId,
+				jobName,
+				serializedConfig,
+				jobConfig,
+				Collections.emptyList(),
+				Collections.emptyList()),
+			futureExecutor,
+			ioExecutor,
+			timeout,
+			restartStrategy,
+			slotProvider);
 	}
 
 	/**
@@ -318,33 +318,41 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 */
 	@VisibleForTesting
 	ExecutionGraph(
+			JobInformation jobInformation,
 			ScheduledExecutorService futureExecutor,
 			Executor ioExecutor,
-			JobID jobId,
-			String jobName,
-			Configuration jobConfig,
-			SerializedValue<ExecutionConfig> serializedConfig,
 			Time timeout,
 			RestartStrategy restartStrategy,
-			SlotProvider slotProvider,
-			@Nullable BlobServer blobServer) {
+			SlotProvider slotProvider) throws IOException {
 		this(
-			new JobInformation(
-				jobId,
-				jobName,
-				serializedConfig,
-				jobConfig,
-				Collections.emptyList(),
-				Collections.emptyList()),
+			jobInformation,
 			futureExecutor,
 			ioExecutor,
 			timeout,
 			restartStrategy,
 			new RestartAllStrategy.Factory(),
+			slotProvider);
+	}
+
+	@VisibleForTesting
+	ExecutionGraph(
+			JobInformation jobInformation,
+			ScheduledExecutorService futureExecutor,
+			Executor ioExecutor,
+			Time timeout,
+			RestartStrategy restartStrategy,
+			FailoverStrategy.Factory failoverStrategy,
+			SlotProvider slotProvider) throws IOException {
+		this(
+			jobInformation,
+			futureExecutor,
+			ioExecutor,
+			timeout,
+			restartStrategy,
+			failoverStrategy,
 			slotProvider,
 			ExecutionGraph.class.getClassLoader(),
-			blobServer
-		);
+			null);
 	}
 
 	public ExecutionGraph(
@@ -356,21 +364,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			FailoverStrategy.Factory failoverStrategyFactory,
 			SlotProvider slotProvider,
 			ClassLoader userClassLoader,
-			@Nullable BlobServer blobServer) {
+			@Nullable BlobServer blobServer) throws IOException {
 
 		checkNotNull(futureExecutor);
 
 		this.jobInformation = Preconditions.checkNotNull(jobInformation);
 
-		// serialize the job information to do the serialisation work only once
-		try {
-			this.serializedJobInformation = new SerializedValue<>(jobInformation);
-		}
-		catch (IOException e) {
-			// this cannot happen because 'JobInformation' is perfectly serializable
-			// rethrow unchecked, because this indicates a bug, not a recoverable situation
-			throw new FlinkRuntimeException("Bug: Cannot serialize JobInformation", e);
-		}
+		this.jobInformationOrBlobKey = BlobServer.tryOffload(jobInformation, jobInformation.getJobId(), blobServer);
 
 		this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
 		this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
@@ -405,50 +405,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
 
 		this.blobServer = blobServer;
-		this.jobInformationBlobKey = tryOffLoadJobInformation();
-	}
-
-	/**
-	 * Tries to store {@link #serializedJobInformation} and in the graph's {@link
-	 * ExecutionGraph#blobServer} (if not <tt>null</tt>) so that RPC messages do not need to include
-	 * it.
-	 *
-	 * @return the BLOB key of the uploaded job information or <tt>null</tt> if the upload failed
-	 */
-	@Nullable
-	private PermanentBlobKey tryOffLoadJobInformation() {
-		if (blobServer == null) {
-			return null;
-		}
-
-		// If the serialized job information inside serializedJobInformation is larger than this,
-		// we try to offload it to the BLOB server.
-		final int rpcOffloadMinSize =
-			blobServer.getConfiguration().getInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE);
-
-		if (serializedJobInformation.getByteArray().length > rpcOffloadMinSize) {
-			LOG.info("Storing job {} information at the BLOB server", getJobID());
-
-			// TODO: do not overwrite existing job info and thus speed up recovery?
-			try {
-				return blobServer.putPermanent(getJobID(), serializedJobInformation.getByteArray());
-			} catch (IOException e) {
-				LOG.warn("Failed to offload job " + getJobID() + " information data to BLOB store", e);
-			}
-		}
-
-		return null;
-	}
-
-	/**
-	 * Returns the key of the offloaded job information BLOB containing {@link
-	 * #serializedJobInformation}.
-	 *
-	 * @return the BLOB key or <tt>null</tt> if not offloaded
-	 */
-	@Nullable
-	public PermanentBlobKey getJobInformationBlobKey() {
-		return jobInformationBlobKey;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -633,8 +589,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return slotProvider;
 	}
 
-	public SerializedValue<JobInformation> getSerializedJobInformation() {
-		return serializedJobInformation;
+	public Either<SerializedValue<JobInformation>, PermanentBlobKey> getJobInformationOrBlobKey() {
+		return jobInformationOrBlobKey;
 	}
 
 	@Override
@@ -749,6 +705,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return this.stateTimestamps[status.ordinal()];
 	}
 
+	@Nullable
 	public final BlobServer getBlobServer() {
 		return blobServer;
 	}
@@ -843,8 +800,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			}
 
 			// create the execution job vertex and attach it to the graph
-			ExecutionJobVertex ejv =
-					new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, globalModVersion, createTimestamp);
+			ExecutionJobVertex ejv = new ExecutionJobVertex(
+				this,
+				jobVertex,
+				1,
+				rpcCallTimeout,
+				globalModVersion,
+				createTimestamp);
+
 			ejv.connectToPredecessors(this.intermediateResults);
 
 			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 42fbfc1..8d48432 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -103,24 +103,31 @@ public class ExecutionGraphBuilder {
 		final FailoverStrategy.Factory failoverStrategy =
 				FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
 
+		final JobInformation jobInformation = new JobInformation(
+			jobId,
+			jobName,
+			jobGraph.getSerializedExecutionConfig(),
+			jobGraph.getJobConfiguration(),
+			jobGraph.getUserJarBlobKeys(),
+			jobGraph.getClasspaths());
+
 		// create a new execution graph, if none exists so far
-		final ExecutionGraph executionGraph = (prior != null) ? prior :
-				new ExecutionGraph(
-					new JobInformation(
-						jobId,
-						jobName,
-						jobGraph.getSerializedExecutionConfig(),
-						jobGraph.getJobConfiguration(),
-						jobGraph.getUserJarBlobKeys(),
-						jobGraph.getClasspaths()),
-					futureExecutor,
-					ioExecutor,
-					timeout,
-					restartStrategy,
-					failoverStrategy,
-					slotProvider,
-					classLoader,
-					blobServer);
+		final ExecutionGraph executionGraph;
+		try {
+			executionGraph = (prior != null) ? prior :
+                new ExecutionGraph(
+                    jobInformation,
+                    futureExecutor,
+                    ioExecutor,
+                    timeout,
+                    restartStrategy,
+                    failoverStrategy,
+                    slotProvider,
+                    classLoader,
+                    blobServer);
+		} catch (IOException e) {
+			throw new JobException("Could not create the ExecutionGraph.", e);
+		}
 
 		// set the basic properties
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 33a4359..9adaf45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -46,8 +46,10 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -131,6 +133,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	@Nullable
 	private PermanentBlobKey taskInformationBlobKey = null;
 
+	private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = null;
+
 	private InputSplitAssigner splitAssigner;
 
 	/**
@@ -147,12 +151,12 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	}
 
 	public ExecutionJobVertex(
-		ExecutionGraph graph,
-		JobVertex jobVertex,
-		int defaultParallelism,
-		Time timeout,
-		long initialGlobalModVersion,
-		long createTimestamp) throws JobException {
+			ExecutionGraph graph,
+			JobVertex jobVertex,
+			int defaultParallelism,
+			Time timeout,
+			long initialGlobalModVersion,
+			long createTimestamp) throws JobException {
 
 		if (graph == null || jobVertex == null) {
 			throw new NullPointerException();
@@ -359,80 +363,29 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return inputs;
 	}
 
-	public SerializedValue<TaskInformation> getSerializedTaskInformation() throws IOException {
-
+	public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
 		// only one thread should offload the task information, so let's also let only one thread
 		// serialize the task information!
 		synchronized (stateMonitor) {
-			if (null == serializedTaskInformation) {
-
-				int parallelism = getParallelism();
-				int maxParallelism = getMaxParallelism();
-
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Creating task information for " + generateDebugString());
-				}
-
-				serializedTaskInformation = new SerializedValue<>(
-					new TaskInformation(
-						jobVertex.getID(),
-						jobVertex.getName(),
-						parallelism,
-						maxParallelism,
-						jobVertex.getInvokableClassName(),
-						jobVertex.getConfiguration()));
-
-				taskInformationBlobKey = tryOffLoadTaskInformation();
-			}
-		}
-
-		return serializedTaskInformation;
-	}
-
-	/**
-	 * Returns the key of the offloaded task information BLOB containing {@link
-	 * #serializedTaskInformation}.
-	 * <p>
-	 * This may be true after the first call to {@link #getSerializedTaskInformation()}.
-	 *
-	 * @return the BLOB key or <tt>null</tt> if not offloaded
-	 */
-	@Nullable
-	public PermanentBlobKey getTaskInformationBlobKey() {
-		return taskInformationBlobKey;
-	}
-
-	/**
-	 * Tries to store {@link #serializedTaskInformation} and in the graph's {@link
-	 * ExecutionGraph#blobServer} (if not <tt>null</tt>) so that RPC messages do not need to include
-	 * it.
-	 *
-	 * @return the BLOB key of the uploaded task information or <tt>null</tt> if the upload failed
-	 */
-	@Nullable
-	private PermanentBlobKey tryOffLoadTaskInformation() {
-		BlobServer blobServer = graph.getBlobServer();
-		if (blobServer == null) {
-			return null;
-		}
-
-		// If the serialized task information inside #serializedTaskInformation is larger than this,
-		// we try to offload it to the BLOB server.
-		final int rpcOffloadMinSize =
-			blobServer.getConfiguration().getInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE);
-
-		if (serializedTaskInformation.getByteArray().length > rpcOffloadMinSize) {
-			LOG.info("Storing task {} information at the BLOB server", getJobVertexId());
-
-			// TODO: do not overwrite existing task info and thus speed up recovery?
-			try {
-				return blobServer.putPermanent(getJobId(), serializedTaskInformation.getByteArray());
-			} catch (IOException e) {
-				LOG.warn("Failed to offload task " + getJobVertexId() + " information data to BLOB store", e);
+			if (taskInformationOrBlobKey == null) {
+				final BlobServer blobServer = graph.getBlobServer();
+
+				final TaskInformation taskInformation = new TaskInformation(
+					jobVertex.getID(),
+					jobVertex.getName(),
+					parallelism,
+					maxParallelism,
+					jobVertex.getInvokableClassName(),
+					jobVertex.getConfiguration());
+
+				taskInformationOrBlobKey = BlobServer.tryOffload(
+					taskInformation,
+					getJobId(),
+					blobServer);
 			}
 		}
 
-		return null;
+		return taskInformationOrBlobKey;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 17ad3c8..6b9d481 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -46,8 +46,10 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 
@@ -770,33 +772,32 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, partitionType, queueToRequest, partitions));
 		}
 
-		TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
-		{
-			PermanentBlobKey jobInfoBlobKey = getExecutionGraph().getJobInformationBlobKey();
-			if (jobInfoBlobKey != null) {
-				serializedJobInformation =
-					new TaskDeploymentDescriptor.Offloaded<>(jobInfoBlobKey);
-			} else {
-				serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(
-					getExecutionGraph().getSerializedJobInformation());
-			}
+		final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey = getExecutionGraph().getJobInformationOrBlobKey();
+
+		final TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
+
+		if (jobInformationOrBlobKey.isLeft()) {
+			serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left());
+		} else {
+			serializedJobInformation = new TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right());
 		}
 
-		TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> serializedTaskInformation;
-		{
-			PermanentBlobKey taskInfoBlobKey = jobVertex.getTaskInformationBlobKey();
-			if (taskInfoBlobKey != null) {
-				serializedTaskInformation = new TaskDeploymentDescriptor.Offloaded<>(taskInfoBlobKey);
-			} else {
-				try {
-					serializedTaskInformation = new TaskDeploymentDescriptor.NonOffloaded<>(
-						jobVertex.getSerializedTaskInformation());
-				} catch (IOException e) {
-					throw new ExecutionGraphException(
-						"Could not create a serialized JobVertexInformation for " +
-							jobVertex.getJobVertexId(), e);
-				}
-			}
+		final Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey;
+
+		try {
+			taskInformationOrBlobKey = jobVertex.getTaskInformationOrBlobKey();
+		} catch (IOException e) {
+			throw new ExecutionGraphException(
+				"Could not create a serialized JobVertexInformation for " +
+					jobVertex.getJobVertexId(), e);
+		}
+
+		final TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> serializedTaskInformation;
+
+		if (taskInformationOrBlobKey.isLeft()) {
+			serializedTaskInformation = new TaskDeploymentDescriptor.NonOffloaded<>(taskInformationOrBlobKey.left());
+		} else {
+			serializedTaskInformation = new TaskDeploymentDescriptor.Offloaded<>(taskInformationOrBlobKey.right());
 		}
 
 		return new TaskDeploymentDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 3af4171..b9ca508 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.IntCounter;
@@ -36,6 +35,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -54,7 +54,7 @@ import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
@@ -68,7 +68,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 
-import static junit.framework.TestCase.assertNull;
+import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -78,7 +78,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for {@link ExecutionGraph} deployment.
  */
-public class ExecutionGraphDeploymentTest {
+public class ExecutionGraphDeploymentTest extends TestLogger {
 
 	/**
 	 * BLOB server instance to use for the job graph (may be <tt>null</tt>).
@@ -98,7 +98,7 @@ public class ExecutionGraphDeploymentTest {
 	 * @param eg           the execution graph that was created
 	 */
 	protected void checkJobOffloaded(ExecutionGraph eg) throws Exception {
-		assertNull(eg.getJobInformationBlobKey());
+		assertTrue(eg.getJobInformationOrBlobKey().isLeft());
 	}
 
 	/**
@@ -109,7 +109,7 @@ public class ExecutionGraphDeploymentTest {
 	 * @param jobVertexId  job vertex ID
 	 */
 	protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
-		assertNull(eg.getJobVertex(jobVertexId).getTaskInformationBlobKey());
+		assertTrue(eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey().isLeft());
 	}
 
 	@Test
@@ -141,17 +141,21 @@ public class ExecutionGraphDeploymentTest {
 			v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 
+			final JobInformation expectedJobInformation = new DummyJobInformation(
+				jobId,
+				"some job");
+
 			ExecutionGraph eg = new ExecutionGraph(
+				expectedJobInformation,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
-				jobId,
-				"some job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy(),
+				new RestartAllStrategy.Factory(),
 				new Scheduler(TestingUtils.defaultExecutionContext()),
+				ExecutionGraph.class.getClassLoader(),
 				blobServer);
+
 			checkJobOffloaded(eg);
 
 			List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
@@ -387,12 +391,12 @@ public class ExecutionGraphDeploymentTest {
 		}
 	}
 
-	@Test
 	/**
 	 * Tests that a blocking batch job fails if there are not enough resources left to schedule the
 	 * succeeding tasks. This test case is related to [FLINK-4296] where finished producing tasks
 	 * swallow the fail exception when scheduling a consumer task.
 	 */
+	@Test
 	public void testNoResourceAvailableFailure() throws Exception {
 		final JobID jobId = new JobID();
 		JobVertex v1 = new JobVertex("source");
@@ -418,18 +422,22 @@ public class ExecutionGraphDeploymentTest {
 							TestingUtils.directExecutionContext()))));
 		}
 
+		final JobInformation jobInformation = new DummyJobInformation(
+			jobId,
+			"failing test job");
+
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
+			jobInformation,
 			new DirectScheduledExecutorService(),
 			TestingUtils.defaultExecutor(),
-			jobId,
-			"failing test job",
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
+			new RestartAllStrategy.Factory(),
 			scheduler,
+			ExecutionGraph.class.getClassLoader(),
 			blobServer);
+
 		checkJobOffloaded(eg);
 
 		eg.setQueuedSchedulingAllowed(false);
@@ -495,17 +503,20 @@ public class ExecutionGraphDeploymentTest {
 											TestingUtils.directExecutionContext()))));
 		}
 
+		final JobInformation jobInformation = new DummyJobInformation(
+			jobId,
+			"some job");
+
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
+			jobInformation,
 			new DirectScheduledExecutorService(),
 			TestingUtils.defaultExecutor(),
-			jobId, 
-			"some job",
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
+			new RestartAllStrategy.Factory(),
 			scheduler,
+			ExecutionGraph.class.getClassLoader(),
 			blobServer);
 		checkJobOffloaded(eg);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
index f1c071e..0fcf8c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -41,7 +41,7 @@ public class ExecutionGraphDeploymentWithBlobCacheTest extends ExecutionGraphDep
 	public void setupBlobServer() throws IOException {
 		Configuration config = new Configuration();
 		// always offload the serialized job and task information
-		config.setInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE, 0);
+		config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
 		blobServer = new BlobServer(config, new VoidBlobStore());
 		blobServer.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
index 030e18d..59d8bc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
 
 import org.junit.After;
 import org.junit.Before;
@@ -36,7 +38,6 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -53,7 +54,7 @@ public class ExecutionGraphDeploymentWithBlobServerTest extends ExecutionGraphDe
 	public void setupBlobServer() throws IOException {
 		Configuration config = new Configuration();
 		// always offload the serialized job and task information
-		config.setInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE, 0);
+		config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
 		blobServer = Mockito.spy(new BlobServer(config, new VoidBlobStore()));
 
 		seenHashes.clear();
@@ -81,19 +82,21 @@ public class ExecutionGraphDeploymentWithBlobServerTest extends ExecutionGraphDe
 
 	@Override
 	protected void checkJobOffloaded(ExecutionGraph eg) throws Exception {
-		PermanentBlobKey jobInformationBlobKey = eg.getJobInformationBlobKey();
-		assertNotNull(jobInformationBlobKey);
+		Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey = eg.getJobInformationOrBlobKey();
+
+		assertTrue(jobInformationOrBlobKey.isRight());
 
 		// must not throw:
-		blobServer.getFile(eg.getJobID(), jobInformationBlobKey);
+		blobServer.getFile(eg.getJobID(), jobInformationOrBlobKey.right());
 	}
 
 	@Override
 	protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
-		PermanentBlobKey taskInformationBlobKey = eg.getJobVertex(jobVertexId).getTaskInformationBlobKey();
-		assertNotNull(taskInformationBlobKey);
+		Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey();
+
+		assertTrue(taskInformationOrBlobKey.isRight());
 
 		// must not throw:
-		blobServer.getFile(eg.getJobID(), taskInformationBlobKey);
+		blobServer.getFile(eg.getJobID(), taskInformationOrBlobKey.right());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 1f20e12..4e89d43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -133,9 +133,7 @@ public class FailoverRegionTest extends TestLogger {
 			AkkaUtils.getDefaultTimeout(),
 			new InfiniteDelayRestartStrategy(10),
 			new FailoverPipelinedRegionWithDirectExecutor(),
-			slotProvider,
-			ExecutionGraph.class.getClassLoader(),
-			null);
+			slotProvider);
 
 		eg.attachJobGraph(ordered);
 
@@ -257,9 +255,7 @@ public class FailoverRegionTest extends TestLogger {
 				AkkaUtils.getDefaultTimeout(),
 				new InfiniteDelayRestartStrategy(10),
 				new RestartPipelinedRegionStrategy.Factory(),
-				scheduler,
-				ExecutionGraph.class.getClassLoader(),
-				null);
+				scheduler);
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -331,9 +327,7 @@ public class FailoverRegionTest extends TestLogger {
 			AkkaUtils.getDefaultTimeout(),
 			new InfiniteDelayRestartStrategy(10),
 			new FailoverPipelinedRegionWithDirectExecutor(),
-			scheduler,
-			ExecutionGraph.class.getClassLoader(),
-			null);
+			scheduler);
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -441,9 +435,7 @@ public class FailoverRegionTest extends TestLogger {
 			AkkaUtils.getDefaultTimeout(),
 			restartStrategy,
 			new FailoverPipelinedRegionWithDirectExecutor(),
-			scheduler,
-			ExecutionGraph.class.getClassLoader(),
-			null);
+			scheduler);
 		try {
 			eg.attachJobGraph(ordered);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index b1d6692..d8f0309 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -168,9 +168,7 @@ public class GlobalModVersionTest {
 			Time.seconds(10),
 			new InfiniteDelayRestartStrategy(),
 			new CustomStrategy(failoverStrategy),
-			slotProvider,
-			getClass().getClassLoader(),
-			null);
+			slotProvider);
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index 9d924c9..33456f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -291,9 +291,7 @@ public class IndividualRestartsConcurrencyTest {
 			Time.seconds(10),
 			restartStrategy,
 			failoverStrategy,
-			slotProvider,
-			getClass().getClassLoader(),
-			null);
+			slotProvider);
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index 93c163b..c78e193 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -303,11 +303,13 @@ public class PipelinedRegionFailoverConcurrencyTest {
 			SlotProvider slotProvider,
 			int parallelism) throws Exception {
 
+		final JobInformation jobInformation = new DummyJobInformation(
+			jid,
+			"test job");
+
 		// build a simple execution graph with on job vertex, parallelism 2
 		final ExecutionGraph graph = new ExecutionGraph(
-			new DummyJobInformation(
-				jid,
-				"test job"),
+			jobInformation,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			Time.seconds(10),

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
index edb39e9..8198df5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -89,10 +89,12 @@ public class RestartPipelinedRegionStrategyTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
         Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+        final JobInformation jobInformation = new DummyJobInformation(
+			jobId,
+			jobName);
+
 		ExecutionGraph eg = new ExecutionGraph(
-			new DummyJobInformation(
-				jobId,
-				jobName),
+			jobInformation,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			AkkaUtils.getDefaultTimeout(),
@@ -171,10 +173,12 @@ public class RestartPipelinedRegionStrategyTest {
         List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
         Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+        final JobInformation jobInformation = new DummyJobInformation(
+			jobId,
+			jobName);
+
 		ExecutionGraph eg = new ExecutionGraph(
-			new DummyJobInformation(
-				jobId,
-				jobName),
+			jobInformation,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			AkkaUtils.getDefaultTimeout(),
@@ -258,10 +262,12 @@ public class RestartPipelinedRegionStrategyTest {
         List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
         Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+		final JobInformation jobInformation = new DummyJobInformation(
+			jobId,
+			jobName);
+
 		ExecutionGraph eg = new ExecutionGraph(
-			new DummyJobInformation(
-				jobId,
-				jobName),
+			jobInformation,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			AkkaUtils.getDefaultTimeout(),
@@ -336,10 +342,12 @@ public class RestartPipelinedRegionStrategyTest {
         List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4));
 
         Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+		final JobInformation jobInformation = new DummyJobInformation(
+			jobId,
+			jobName);
+
         ExecutionGraph eg = new ExecutionGraph(
-        	new DummyJobInformation(
-        		jobId,
-				jobName),
+        	jobInformation,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			AkkaUtils.getDefaultTimeout(),

http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index e72ddf7..b2e63be 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -140,10 +141,12 @@ public class RescalePartitionerTest extends TestLogger {
 		assertEquals(4, mapVertex.getParallelism());
 		assertEquals(2, sinkVertex.getParallelism());
 
+		final JobInformation jobInformation = new DummyJobInformation(
+			jobId,
+			jobName);
+
 		ExecutionGraph eg = new ExecutionGraph(
-			new DummyJobInformation(
-				jobId,
-				jobName),
+			jobInformation,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			AkkaUtils.getDefaultTimeout(),