You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/24 16:09:35 UTC

[2/6] flink git commit: [FLINK-6046] offload large data from the TaskDeploymentDescriptor to the BlobServer

[FLINK-6046] offload large data from the TaskDeploymentDescriptor to the BlobServer

This only includes potentially big parts, i.e. serializedJobInformation and
serializedTaskInformation, which are both offloaded only once for all parallel
instances.

- adds a configurable akka.rpc.offload.minsize threshold for large data
- serialized task information is uploaded only once per task, irrespective of
  the parallelism

[FLINK-6046][tests] added an integration test with a job with big payload

This verifies that uploading a job with a payload of 100MB is successful.

NOTE: This only works after also implementing JobGraph offloading during
submission or akka.framesize will already limit it when sending it form the
client to the jobmanager.

This closes #4412.

Set min offloading size to 1 MiB


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a9df74e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a9df74e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a9df74e

Branch: refs/heads/master
Commit: 7a9df74eaff94601abf1caf974eb88d347d34cfe
Parents: 0dd5d70
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Oct 6 12:02:54 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 24 18:08:30 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |   2 +
 .../flink/configuration/BlobServerOptions.java  |   6 +
 .../flink/configuration/JobManagerOptions.java  |   7 +
 .../org/apache/flink/runtime/blob/BlobKey.java  |   3 +-
 .../apache/flink/runtime/blob/BlobServer.java   |   9 +
 .../deployment/TaskDeploymentDescriptor.java    | 181 ++++++++++++++++++-
 .../runtime/executiongraph/ExecutionGraph.java  | 130 ++++++++++---
 .../executiongraph/ExecutionGraphBuilder.java   |   7 +-
 .../executiongraph/ExecutionJobVertex.java      | 110 ++++++++---
 .../runtime/executiongraph/ExecutionVertex.java |  38 +++-
 .../flink/runtime/jobmaster/JobMaster.java      |   1 +
 .../runtime/taskexecutor/TaskExecutor.java      |  37 ++--
 .../flink/runtime/jobmanager/JobManager.scala   |   1 +
 .../flink/runtime/taskmanager/TaskManager.scala |  15 +-
 .../CheckpointSettingsSerializableTest.java     |   1 +
 ...ExecutionGraphCheckpointCoordinatorTest.java |   3 +-
 .../TaskDeploymentDescriptorTest.java           |   5 +-
 .../ExecutionGraphDeploymentTest.java           | 124 +++++++++----
 ...ecutionGraphDeploymentWithBlobCacheTest.java |  59 ++++++
 ...cutionGraphDeploymentWithBlobServerTest.java |  99 ++++++++++
 .../ExecutionGraphSchedulingTest.java           |   1 +
 .../executiongraph/ExecutionGraphTestUtils.java |  47 ++++-
 .../ExecutionVertexLocalityTest.java            |   1 +
 .../executiongraph/FailoverRegionTest.java      |  12 +-
 .../executiongraph/GlobalModVersionTest.java    |   3 +-
 .../IndividualRestartsConcurrencyTest.java      |   3 +-
 .../PipelinedRegionFailoverConcurrencyTest.java |   3 +-
 .../RestartPipelinedRegionStrategyTest.java     |  12 +-
 .../PipelinedFailoverRegionBuildingTest.java    |   1 +
 .../flink/runtime/jobmaster/JobMasterTest.java  |  23 ++-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  10 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   5 +-
 .../testingUtils/TestingTaskManagerLike.scala   |  24 +--
 .../partitioner/RescalePartitionerTest.java     |   3 +-
 .../runtime/BigUserProgramJobSubmitITCase.java  | 120 ++++++++++++
 35 files changed, 938 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index af14d5f..d476492 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -309,6 +309,8 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 - `taskmanager.exit-on-fatal-akka-error`: Whether the TaskManager shall be terminated in case of a fatal Akka error (quarantining event). (DEFAULT: **false**)
 
+- `jobmanager.tdd.offload.minsize`: Maximum size of of the `TaskDeploymentDescriptor`'s serialized task and job information to still transmit them via RPC. Larger blobs may be offloaded to the BLOB server. (DEFAULT: **1 KiB**).
+
 ### Distributed Coordination (via Akka)
 
 - `akka.ask.timeout`: Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d) (DEFAULT: **10 s**).

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
index 019580a..20b7303 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
@@ -87,4 +87,10 @@ public class BlobServerOptions {
 		key("blob.service.cleanup.interval")
 			.defaultValue(3_600L) // once per hour
 			.withDeprecatedKeys("library-cache-manager.cleanup.interval");
+
+	/**
+	 * The minimum size for messages to be offloaded to the BlobServer.
+	 */
+	public static final ConfigOption<Integer> OFFLOAD_MINSIZE = key("blob.offload.minsize")
+		.defaultValue(1_024 * 1_024); // 1MiB by default
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index ef3306e..a39927d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -96,6 +96,13 @@ public class JobManagerOptions {
 		key("jobmanager.archive.fs.dir")
 			.noDefaultValue();
 
+	/**
+	 * The maximum size of the <tt>TaskDeploymentDescriptor</tt>'s serialized task and job
+	 * information to still transmit them via RPC. Larger blobs may be offloaded to the BLOB server.
+	 */
+	public static final ConfigOption<Integer> TDD_OFFLOAD_MINSIZE = key("jobmanager.tdd.offload.minsize")
+		.defaultValue(1_024); // 1KiB by default
+
 	// ---------------------------------------------------------------------------------------------
 
 	private JobManagerOptions() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index ef2d64d..4b1d498 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -183,7 +183,8 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 	 *
 	 * @return a 20 bit hash of the contents the key refers to
 	 */
-	byte[] getHash() {
+	@VisibleForTesting
+	public byte[] getHash() {
 		return key;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index ec6044a..7249c8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -880,6 +880,15 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	}
 
 	/**
+	 * Returns the configuration used by the BLOB server.
+	 *
+	 * @return configuration
+	 */
+	public final Configuration getConfiguration() {
+		return blobServiceConfiguration;
+	}
+
+	/**
 	 * Returns the port on which the server is listening.
 	 *
 	 * @return port on which the server is listening

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 1fa5eb5..0c7e308 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.deployment;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -26,7 +29,12 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
 import java.io.Serializable;
+import java.nio.file.Files;
 import java.util.Collection;
 
 /**
@@ -36,11 +44,82 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 	private static final long serialVersionUID = -3233562176034358530L;
 
-	/** Serialized job information. */
-	private final SerializedValue<JobInformation> serializedJobInformation;
+	/**
+	 * Wrapper class for serialized values which may be offloaded to the {@link
+	 * org.apache.flink.runtime.blob.BlobServer} or not.
+	 *
+	 * @param <T>
+	 * 		type of the serialized value
+	 */
+	@SuppressWarnings("unused")
+	public static class MaybeOffloaded<T> implements Serializable {
+		private static final long serialVersionUID = 5977104446396536907L;
+	}
+
+	/**
+	 * A serialized value that is not offloaded to the {@link org.apache.flink.runtime.blob.BlobServer}.
+	 *
+	 * @param <T>
+	 * 		type of the serialized value
+	 */
+	public static class NonOffloaded<T> extends MaybeOffloaded<T> {
+		private static final long serialVersionUID = 4246628617754862463L;
+
+		/**
+		 * The serialized value.
+		 */
+		public SerializedValue<T> serializedValue;
+
+		@SuppressWarnings("unused")
+		public NonOffloaded() {
+		}
+
+		public NonOffloaded(SerializedValue<T> serializedValue) {
+			this.serializedValue = Preconditions.checkNotNull(serializedValue);
+		}
+	}
+
+	/**
+	 * Reference to a serialized value that was offloaded to the {@link
+	 * org.apache.flink.runtime.blob.BlobServer}.
+	 *
+	 * @param <T>
+	 * 		type of the serialized value
+	 */
+	public static class Offloaded<T> extends MaybeOffloaded<T> {
+		private static final long serialVersionUID = 4544135485379071679L;
+
+		/**
+		 * The key of the offloaded value BLOB.
+		 */
+		public PermanentBlobKey serializedValueKey;
+
+		@SuppressWarnings("unused")
+		public Offloaded() {
+		}
 
-	/** Serialized task information. */
-	private final SerializedValue<TaskInformation> serializedTaskInformation;
+		public Offloaded(PermanentBlobKey serializedValueKey) {
+			this.serializedValueKey = Preconditions.checkNotNull(serializedValueKey);
+		}
+	}
+
+	/**
+	 * Serialized job information or <tt>null</tt> if offloaded.
+	 */
+	private MaybeOffloaded<JobInformation> serializedJobInformation;
+
+	/**
+	 * Serialized task information or <tt>null</tt> if offloaded.
+	 */
+	private MaybeOffloaded<TaskInformation> serializedTaskInformation;
+
+	/**
+	 * The ID referencing the job this task belongs to.
+	 *
+	 * <p>NOTE: this is redundant to the information stored in {@link #serializedJobInformation} but
+	 * needed in order to restore offloaded data.</p>
+	 */
+	private final JobID jobId;
 
 	/** The ID referencing the attempt to execute the task. */
 	private final ExecutionAttemptID executionId;
@@ -67,8 +146,9 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	private final TaskStateSnapshot taskStateHandles;
 
 	public TaskDeploymentDescriptor(
-			SerializedValue<JobInformation> serializedJobInformation,
-			SerializedValue<TaskInformation> serializedTaskInformation,
+			JobID jobId,
+			MaybeOffloaded<JobInformation> serializedJobInformation,
+			MaybeOffloaded<TaskInformation> serializedTaskInformation,
 			ExecutionAttemptID executionAttemptId,
 			AllocationID allocationId,
 			int subtaskIndex,
@@ -78,8 +158,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
 			Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
 
+		this.jobId = Preconditions.checkNotNull(jobId);
+
 		this.serializedJobInformation = Preconditions.checkNotNull(serializedJobInformation);
 		this.serializedTaskInformation = Preconditions.checkNotNull(serializedTaskInformation);
+
 		this.executionId = Preconditions.checkNotNull(executionAttemptId);
 		this.allocationId = Preconditions.checkNotNull(allocationId);
 
@@ -101,19 +184,46 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	/**
 	 * Return the sub task's serialized job information.
 	 *
-	 * @return serialized job information
+	 * @return serialized job information (may be <tt>null</tt> before a call to {@link
+	 * #loadBigData(PermanentBlobService)}).
 	 */
+	@Nullable
 	public SerializedValue<JobInformation> getSerializedJobInformation() {
-		return serializedJobInformation;
+		if (serializedJobInformation instanceof NonOffloaded) {
+			NonOffloaded<JobInformation> jobInformation =
+				(NonOffloaded<JobInformation>) serializedJobInformation;
+			return jobInformation.serializedValue;
+		} else {
+			throw new IllegalStateException(
+				"Trying to work with offloaded serialized job information.");
+		}
 	}
 
 	/**
 	 * Return the sub task's serialized task information.
 	 *
-	 * @return serialized task information
+	 * @return serialized task information (may be <tt>null</tt> before a call to {@link
+	 * #loadBigData(PermanentBlobService)}).
 	 */
+	@Nullable
 	public SerializedValue<TaskInformation> getSerializedTaskInformation() {
-		return serializedTaskInformation;
+		if (serializedJobInformation instanceof NonOffloaded) {
+			NonOffloaded<TaskInformation> jobInformation =
+				(NonOffloaded<TaskInformation>) serializedTaskInformation;
+			return jobInformation.serializedValue;
+		} else {
+			throw new IllegalStateException(
+				"Trying to work with offloaded serialized job information.");
+		}
+	}
+
+	/**
+	 * Returns the task's job ID.
+	 *
+	 * @return the job ID this task belongs to
+	 */
+	public JobID getJobId() {
+		return jobId;
 	}
 
 	public ExecutionAttemptID getExecutionAttemptId() {
@@ -161,6 +271,57 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		return allocationId;
 	}
 
+	/**
+	 * Loads externalized data from the BLOB store back to the object.
+	 *
+	 * @param blobService
+	 * 		the blob store to use (may be <tt>null</tt> if {@link #serializedJobInformation} and {@link
+	 * 		#serializedTaskInformation} are non-<tt>null</tt>)
+	 *
+	 * @throws IOException
+	 * 		during errors retrieving or reading the BLOBs
+	 * @throws ClassNotFoundException
+	 * 		Class of a serialized object cannot be found.
+	 */
+	public void loadBigData(@Nullable PermanentBlobService blobService)
+			throws IOException, ClassNotFoundException {
+
+		// re-integrate offloaded job info from blob
+		// here, if this fails, we need to throw the exception as there is no backup path anymore
+		if (serializedJobInformation instanceof Offloaded) {
+			PermanentBlobKey jobInfoKey = ((Offloaded<JobInformation>) serializedJobInformation).serializedValueKey;
+
+			Preconditions.checkNotNull(blobService);
+
+			final File dataFile = blobService.getFile(jobId, jobInfoKey);
+			// NOTE: Do not delete the job info BLOB since it may be needed again during recovery.
+			//       (it is deleted automatically on the BLOB server and cache when the job
+			//       enters a terminal state)
+			SerializedValue<JobInformation> serializedValue =
+				SerializedValue.fromBytes(Files.readAllBytes(dataFile.toPath()));
+			serializedJobInformation = new NonOffloaded<>(serializedValue);
+		}
+
+		// re-integrate offloaded task info from blob
+		if (serializedTaskInformation instanceof Offloaded) {
+			PermanentBlobKey taskInfoKey = ((Offloaded<TaskInformation>) serializedTaskInformation).serializedValueKey;
+
+			Preconditions.checkNotNull(blobService);
+
+			final File dataFile = blobService.getFile(jobId, taskInfoKey);
+			// NOTE: Do not delete the task info BLOB since it may be needed again during recovery.
+			//       (it is deleted automatically on the BLOB server and cache when the job
+			//       enters a terminal state)
+			SerializedValue<TaskInformation> serializedValue =
+				SerializedValue.fromBytes(Files.readAllBytes(dataFile.toPath()));
+			serializedTaskInformation = new NonOffloaded<>(serializedValue);
+		}
+
+		// make sure that the serialized job and task information fields are filled
+		Preconditions.checkNotNull(serializedJobInformation);
+		Preconditions.checkNotNull(serializedTaskInformation);
+	}
+
 	@Override
 	public String toString() {
 		return String.format("TaskDeploymentDescriptor [execution id: %s, attempt: %d, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ea42724..c0004f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -27,10 +27,12 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -75,6 +77,7 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -124,20 +127,20 @@ import static org.apache.flink.util.Preconditions.checkState;
  *         about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
  *         address the message receiver.</li>
  * </ul>
- * 
+ *
  * <h2>Global and local failover</h2>
- * 
+ *
  * The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>.
- * 
+ *
  * <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole
  * data flow graph from the last completed checkpoint. Global failover is considered the
  * "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is
  * found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
- * 
+ *
  * <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails.
  * The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically
  * attempts to restart as little as possible, but as much as necessary.
- * 
+ *
  * <p>Between local- and global failover, the global failover always takes precedence, because it
  * is the core mechanism that the ExecutionGraph relies on to bring back consistency. The
  * guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented
@@ -145,7 +148,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * failure). Local failover is always scoped by the modification version that the execution graph
  * had when the failover was triggered. If a new global modification version is reached during
  * local failover (meaning there is a concurrent global failover), the failover strategy has to
- * yield before the global failover.  
+ * yield before the global failover.
  */
 public class ExecutionGraph implements AccessExecutionGraph, Archiveable<ArchivedExecutionGraph> {
 
@@ -176,6 +179,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 */
 	private final SerializedValue<JobInformation> serializedJobInformation;
 
+	/**
+	 * The key of the offloaded job information BLOB containing {@link #serializedJobInformation} or
+	 * <tt>null</tt> if not offloaded.
+	 */
+	@Nullable
+	private final PermanentBlobKey jobInformationBlobKey;
+
 	/** The executor which is used to execute futures. */
 	private final ScheduledExecutorService futureExecutor;
 
@@ -235,6 +245,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** The total number of vertices currently in the execution graph */
 	private int numVerticesTotal;
 
+	/** Blob server reference for offloading large RPC messages. */
+	@Nullable
+	private final BlobServer blobServer;
+
 	// ------ Configuration of the Execution -------
 
 	/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -294,6 +308,25 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			Time timeout,
 			RestartStrategy restartStrategy,
 			SlotProvider slotProvider) {
+
+		this(futureExecutor, ioExecutor, jobId, jobName, jobConfig, serializedConfig, timeout,
+			restartStrategy, slotProvider, null);
+	}
+
+	/**
+	 * This constructor is for tests only, because it does not include class loading information.
+	 */
+	ExecutionGraph(
+			ScheduledExecutorService futureExecutor,
+			Executor ioExecutor,
+			JobID jobId,
+			String jobName,
+			Configuration jobConfig,
+			SerializedValue<ExecutionConfig> serializedConfig,
+			Time timeout,
+			RestartStrategy restartStrategy,
+			SlotProvider slotProvider,
+			@Nullable BlobServer blobServer) {
 		this(
 			futureExecutor,
 			ioExecutor,
@@ -307,7 +340,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			Collections.emptyList(),
 			Collections.emptyList(),
 			slotProvider,
-			ExecutionGraph.class.getClassLoader());
+			ExecutionGraph.class.getClassLoader(),
+			blobServer
+		);
 	}
 
 	public ExecutionGraph(
@@ -323,7 +358,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			List<PermanentBlobKey> requiredJarFiles,
 			List<URL> requiredClasspaths,
 			SlotProvider slotProvider,
-			ClassLoader userClassLoader) {
+			ClassLoader userClassLoader,
+			@Nullable BlobServer blobServer) {
 
 		checkNotNull(futureExecutor);
 		checkNotNull(jobId);
@@ -379,6 +415,52 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		// is ready by the time the failover strategy sees it
 		this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
 		LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
+
+		this.blobServer = blobServer;
+		this.jobInformationBlobKey = tryOffLoadJobInformation();
+	}
+
+	/**
+	 * Tries to store {@link #serializedJobInformation} and in the graph's {@link
+	 * ExecutionGraph#blobServer} (if not <tt>null</tt>) so that RPC messages do not need to include
+	 * it.
+	 *
+	 * @return the BLOB key of the uploaded job information or <tt>null</tt> if the upload failed
+	 */
+	@Nullable
+	private PermanentBlobKey tryOffLoadJobInformation() {
+		if (blobServer == null) {
+			return null;
+		}
+
+		// If the serialized job information inside serializedJobInformation is larger than this,
+		// we try to offload it to the BLOB server.
+		final int rpcOffloadMinSize =
+			blobServer.getConfiguration().getInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE);
+
+		if (serializedJobInformation.getByteArray().length > rpcOffloadMinSize) {
+			LOG.info("Storing job {} information at the BLOB server", getJobID());
+
+			// TODO: do not overwrite existing job info and thus speed up recovery?
+			try {
+				return blobServer.putPermanent(getJobID(), serializedJobInformation.getByteArray());
+			} catch (IOException e) {
+				LOG.warn("Failed to offload job " + getJobID() + " information data to BLOB store", e);
+			}
+		}
+
+		return null;
+	}
+
+	/**
+	 * Returns the key of the offloaded job information BLOB containing {@link
+	 * #serializedJobInformation}.
+	 *
+	 * @return the BLOB key or <tt>null</tt> if not offloaded
+	 */
+	@Nullable
+	public PermanentBlobKey getJobInformationBlobKey() {
+		return jobInformationBlobKey;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -603,7 +685,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * Gets the number of full restarts that the execution graph went through.
 	 * If a full restart recovery is currently pending, this recovery is included in the
 	 * count.
-	 * 
+	 *
 	 * @return The number of full restarts so far
 	 */
 	public long getNumberOfFullRestarts() {
@@ -679,6 +761,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return this.stateTimestamps[status.ordinal()];
 	}
 
+	public final BlobServer getBlobServer() {
+		return blobServer;
+	}
+
 	/**
 	 * Returns the ExecutionContext associated with this ExecutionGraph.
 	 *
@@ -829,8 +915,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	/**
-	 * 
-	 * 
+	 *
+	 *
 	 * @param slotProvider  The resource provider from which the slots are allocated
 	 * @param timeout       The maximum time that the deployment may take, before a
 	 *                      TimeoutException is thrown.
@@ -923,7 +1009,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 						// we catch everything here to make sure cleanup happens and the
 						// ExecutionGraph notices the error
 
-						// we need to to release all slots before going into recovery! 
+						// we need to to release all slots before going into recovery!
 						try {
 							ExecutionGraphUtils.releaseAllSlotsSilently(resources);
 						}
@@ -934,7 +1020,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 					// Wouldn't it be nice if we could return an actual Void object?
 					// return (Void) Unsafe.getUnsafe().allocateInstance(Void.class);
-					return null; 
+					return null;
 				},
 				futureExecutor);
 
@@ -1084,12 +1170,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/**
 	 * Fails the execution graph globally. This failure will not be recovered by a specific
 	 * failover strategy, but results in a full restart of all tasks.
-	 * 
+	 *
 	 * <p>This global failure is meant to be triggered in cases where the consistency of the
 	 * execution graph' state cannot be guaranteed any more (for example when catching unexpected
 	 * exceptions that indicate a bug or an unexpected call race), and where a full restart is the
 	 * safe way to get consistency back.
-	 * 
+	 *
 	 * @param errorInfo ErrorInfo containing the exception that caused the failure.
 	 */
 	public void failGlobal(ErrorInfo errorInfo) {
@@ -1427,7 +1513,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					return true;
 				}
 				else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
-					final String cause1 = isFailureCauseAllowingRestart ? null :  
+					final String cause1 = isFailureCauseAllowingRestart ? null :
 							"a type of SuppressRestartsException was thrown";
 					final String cause2 = isRestartStrategyAllowingRestart ? null :
 						"the restart strategy prevented it";
@@ -1485,25 +1571,25 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				switch (state.getExecutionState()) {
 					case RUNNING:
 						return attempt.switchToRunning();
-	
+
 					case FINISHED:
 						// this deserialization is exception-free
 						accumulators = deserializeAccumulators(state);
 						attempt.markFinished(accumulators, state.getIOMetrics());
 						return true;
-	
+
 					case CANCELED:
 						// this deserialization is exception-free
 						accumulators = deserializeAccumulators(state);
 						attempt.cancelingComplete(accumulators, state.getIOMetrics());
 						return true;
-	
+
 					case FAILED:
 						// this deserialization is exception-free
 						accumulators = deserializeAccumulators(state);
 						attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
 						return true;
-	
+
 					default:
 						// we mark as failed and return false, which triggers the TaskManager
 						// to remove the task
@@ -1526,9 +1612,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	/**
 	 * Deserializes accumulators from a task state update.
-	 * 
+	 *
 	 * <p>This method never throws an exception!
-	 * 
+	 *
 	 * @param state The task execution state from which to deserialize the accumulators.
 	 * @return The deserialized accumulators, of null, if there are no accumulators or an error occurred.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ba66a2b..3b72505 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -90,6 +91,7 @@ public class ExecutionGraphBuilder {
 			RestartStrategy restartStrategy,
 			MetricGroup metrics,
 			int parallelismForAutoMax,
+			BlobServer blobServer,
 			Logger log)
 		throws JobExecutionException, JobException {
 
@@ -98,7 +100,7 @@ public class ExecutionGraphBuilder {
 		final String jobName = jobGraph.getName();
 		final JobID jobId = jobGraph.getJobID();
 
-		final FailoverStrategy.Factory failoverStrategy = 
+		final FailoverStrategy.Factory failoverStrategy =
 				FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
 
 		// create a new execution graph, if none exists so far
@@ -116,7 +118,8 @@ public class ExecutionGraphBuilder {
 						jobGraph.getUserJarBlobKeys(),
 						jobGraph.getClasspaths(),
 						slotProvider,
-						classLoader);
+						classLoader,
+						blobServer);
 
 		// set the basic properties
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e6d49d2..33a4359 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,6 +32,8 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -46,9 +48,10 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,9 +62,9 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * An {@code ExecutionJobVertex} is part of the {@link ExecutionGraph}, and the peer 
+ * An {@code ExecutionJobVertex} is part of the {@link ExecutionGraph}, and the peer
  * to the {@link JobVertex}.
- * 
+ *
  * <p>The {@code ExecutionJobVertex} corresponds to a parallelized operation. It
  * contains an {@link ExecutionVertex} for each parallel instance of that operation.
  */
@@ -95,7 +98,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 * The ID's are in the same order as {@link ExecutionJobVertex#operatorIDs}.
 	 */
 	private final List<OperatorID> userDefinedOperatorIds;
-	
+
 	private final ExecutionVertex[] taskVertices;
 
 	private final IntermediateResult[] producedDataSets;
@@ -121,6 +124,13 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 */
 	private SerializedValue<TaskInformation> serializedTaskInformation;
 
+	/**
+	 * The key of the offloaded task information BLOB containing {@link #serializedTaskInformation}
+	 * or <tt>null</tt> if not offloaded.
+	 */
+	@Nullable
+	private PermanentBlobKey taskInformationBlobKey = null;
+
 	private InputSplitAssigner splitAssigner;
 
 	/**
@@ -169,7 +179,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		this.taskVertices = new ExecutionVertex[numTaskVertices];
 		this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs());
 		this.userDefinedOperatorIds = Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());
-		
+
 		this.inputs = new ArrayList<>(jobVertex.getInputs().size());
 		
 		// take the sharing group
@@ -351,28 +361,80 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 	public SerializedValue<TaskInformation> getSerializedTaskInformation() throws IOException {
 
-		if (null == serializedTaskInformation) {
+		// only one thread should offload the task information, so let's also let only one thread
+		// serialize the task information!
+		synchronized (stateMonitor) {
+			if (null == serializedTaskInformation) {
 
-			int parallelism = getParallelism();
-			int maxParallelism = getMaxParallelism();
+				int parallelism = getParallelism();
+				int maxParallelism = getMaxParallelism();
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Creating task information for " + generateDebugString());
-			}
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Creating task information for " + generateDebugString());
+				}
 
-			serializedTaskInformation = new SerializedValue<>(
+				serializedTaskInformation = new SerializedValue<>(
 					new TaskInformation(
-							jobVertex.getID(),
-							jobVertex.getName(),
-							parallelism,
-							maxParallelism,
-							jobVertex.getInvokableClassName(),
-							jobVertex.getConfiguration()));
+						jobVertex.getID(),
+						jobVertex.getName(),
+						parallelism,
+						maxParallelism,
+						jobVertex.getInvokableClassName(),
+						jobVertex.getConfiguration()));
+
+				taskInformationBlobKey = tryOffLoadTaskInformation();
+			}
 		}
 
 		return serializedTaskInformation;
 	}
 
+	/**
+	 * Returns the key of the offloaded task information BLOB containing {@link
+	 * #serializedTaskInformation}.
+	 * <p>
+	 * This may be true after the first call to {@link #getSerializedTaskInformation()}.
+	 *
+	 * @return the BLOB key or <tt>null</tt> if not offloaded
+	 */
+	@Nullable
+	public PermanentBlobKey getTaskInformationBlobKey() {
+		return taskInformationBlobKey;
+	}
+
+	/**
+	 * Tries to store {@link #serializedTaskInformation} and in the graph's {@link
+	 * ExecutionGraph#blobServer} (if not <tt>null</tt>) so that RPC messages do not need to include
+	 * it.
+	 *
+	 * @return the BLOB key of the uploaded task information or <tt>null</tt> if the upload failed
+	 */
+	@Nullable
+	private PermanentBlobKey tryOffLoadTaskInformation() {
+		BlobServer blobServer = graph.getBlobServer();
+		if (blobServer == null) {
+			return null;
+		}
+
+		// If the serialized task information inside #serializedTaskInformation is larger than this,
+		// we try to offload it to the BLOB server.
+		final int rpcOffloadMinSize =
+			blobServer.getConfiguration().getInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE);
+
+		if (serializedTaskInformation.getByteArray().length > rpcOffloadMinSize) {
+			LOG.info("Storing task {} information at the BLOB server", getJobVertexId());
+
+			// TODO: do not overwrite existing task info and thus speed up recovery?
+			try {
+				return blobServer.putPermanent(getJobId(), serializedTaskInformation.getByteArray());
+			} catch (IOException e) {
+				LOG.warn("Failed to offload task " + getJobVertexId() + " information data to BLOB store", e);
+			}
+		}
+
+		return null;
+	}
+
 	@Override
 	public ExecutionState getAggregateState() {
 		int[] num = new int[ExecutionState.values().length];
@@ -504,7 +566,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 	/**
 	 * Cancels all currently running vertex executions.
-	 * 
+	 *
 	 * @return A future that is complete once all tasks have canceled.
 	 */
 	public CompletableFuture<Void> cancelWithFuture() {
@@ -588,21 +650,21 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 	/**
 	 * A utility function that computes an "aggregated" state for the vertex.
-	 * 
+	 *
 	 * <p>This state is not used anywhere in the  coordination, but can be used for display
 	 * in dashboards to as a summary for how the particular parallel operation represented by
 	 * this ExecutionJobVertex is currently behaving.
-	 * 
+	 *
 	 * <p>For example, if at least one parallel task is failed, the aggregate state is failed.
 	 * If not, and at least one parallel task is cancelling (or cancelled), the aggregate state
 	 * is cancelling (or cancelled). If all tasks are finished, the aggregate state is finished,
 	 * and so on.
-	 * 
+	 *
 	 * @param verticesPerState The number of vertices in each state (indexed by the ordinal of
 	 *                         the ExecutionState values).
 	 * @param parallelism The parallelism of the ExecutionJobVertex
-	 * 
-	 * @return The aggregate state of this ExecutionJobVertex. 
+	 *
+	 * @return The aggregate state of this ExecutionJobVertex.
 	 */
 	public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
 		if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 9aac133..17ad3c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -47,7 +48,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 
@@ -770,19 +770,39 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, partitionType, queueToRequest, partitions));
 		}
 
-		SerializedValue<JobInformation> serializedJobInformation = getExecutionGraph().getSerializedJobInformation();
-		SerializedValue<TaskInformation> serializedJobVertexInformation = null;
+		TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
+		{
+			PermanentBlobKey jobInfoBlobKey = getExecutionGraph().getJobInformationBlobKey();
+			if (jobInfoBlobKey != null) {
+				serializedJobInformation =
+					new TaskDeploymentDescriptor.Offloaded<>(jobInfoBlobKey);
+			} else {
+				serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(
+					getExecutionGraph().getSerializedJobInformation());
+			}
+		}
 
-		try {
-			serializedJobVertexInformation = jobVertex.getSerializedTaskInformation();
-		} catch (IOException e) {
-			throw new ExecutionGraphException(
-					"Could not create a serialized JobVertexInformation for " + jobVertex.getJobVertexId(), e);
+		TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> serializedTaskInformation;
+		{
+			PermanentBlobKey taskInfoBlobKey = jobVertex.getTaskInformationBlobKey();
+			if (taskInfoBlobKey != null) {
+				serializedTaskInformation = new TaskDeploymentDescriptor.Offloaded<>(taskInfoBlobKey);
+			} else {
+				try {
+					serializedTaskInformation = new TaskDeploymentDescriptor.NonOffloaded<>(
+						jobVertex.getSerializedTaskInformation());
+				} catch (IOException e) {
+					throw new ExecutionGraphException(
+						"Could not create a serialized JobVertexInformation for " +
+							jobVertex.getJobVertexId(), e);
+				}
+			}
 		}
 
 		return new TaskDeploymentDescriptor(
+			getJobId(),
 			serializedJobInformation,
-			serializedJobVertexInformation,
+			serializedTaskInformation,
 			executionId,
 			targetSlot.getAllocatedSlot().getSlotAllocationId(),
 			subTaskIndex,

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 19bf2a5..f60f561 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -288,6 +288,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			restartStrategy,
 			jobMetricGroup,
 			-1,
+			blobServer,
 			log);
 
 		// register self as job status change listener

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 76cfb50..d6295b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -307,17 +307,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			Time timeout) {
 
 		try {
-			// first, deserialize the pre-serialized information
-			final JobInformation jobInformation;
-			final TaskInformation taskInformation;
-			try {
-				jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
-				taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
-			} catch (IOException | ClassNotFoundException e) {
-				throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
-			}
-
-			final JobID jobId = jobInformation.getJobId();
+			final JobID jobId = tdd.getJobId();
 			final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
 
 			if (jobManagerConnection == null) {
@@ -344,6 +334,30 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 				throw new TaskSubmissionException(message);
 			}
 
+			// re-integrate offloaded data:
+			BlobCacheService blobCache = jobManagerConnection.getBlobService();
+			try {
+				tdd.loadBigData(blobCache.getPermanentBlobService());
+			} catch (IOException | ClassNotFoundException e) {
+				throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
+			}
+
+			// deserialize the pre-serialized information
+			final JobInformation jobInformation;
+			final TaskInformation taskInformation;
+			try {
+				jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
+				taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
+			} catch (IOException | ClassNotFoundException e) {
+				throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
+			}
+
+			if (!jobId.equals(jobInformation.getJobId())) {
+				throw new TaskSubmissionException(
+					"Inconsistent job ID information inside TaskDeploymentDescriptor (" +
+						tdd.getJobId() + " vs. " + jobInformation.getJobId() + ")");
+			}
+
 			TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(
 				jobInformation.getJobId(),
 				jobInformation.getJobName(),
@@ -361,6 +375,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 			TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
 			CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
+
 			BlobCacheService blobService = jobManagerConnection.getBlobService();
 			LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
 			ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8c3cf8e..bbb8275 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1332,6 +1332,7 @@ class JobManager(
           restartStrategy,
           jobMetrics,
           numSlots,
+          blobServer,
           log.logger)
         
         if (registerNewGraph) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0f79d51..75708d1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1137,16 +1137,29 @@ class TaskManager(
       }
 
       // create the task. this does not grab any TaskManager resources or download
-      // and libraries - the operation does not block
+      // any libraries except for offloaded TaskDeploymentDescriptor data which
+      // was too big for the RPC - the operation may only block for the latter
 
       val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID.orNull)
 
+      try {
+        tdd.loadBigData(blobCache.getPermanentBlobService);
+      } catch {
+        case e @ (_: IOException | _: ClassNotFoundException) =>
+          throw new IOException("Could not deserialize the job information.", e)
+      }
+
       val jobInformation = try {
         tdd.getSerializedJobInformation.deserializeValue(getClass.getClassLoader)
       } catch {
         case e @ (_: IOException | _: ClassNotFoundException) =>
           throw new IOException("Could not deserialize the job information.", e)
       }
+      if (tdd.getJobId != jobInformation.getJobId) {
+        throw new IOException(
+          "Inconsistent job ID information inside TaskDeploymentDescriptor (" +
+          tdd.getJobId + " vs. " + jobInformation.getJobId + ")")
+      }
 
       val taskInformation = try {
         tdd.getSerializedTaskInformation.deserializeValue(getClass.getClassLoader)

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 84b5774..e500036 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -107,6 +107,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 				new NoRestartStrategy(),
 				new UnregisteredMetricsGroup(),
 				10,
+				null,
 				log);
 
 		assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 744fd60..b89ed5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -93,7 +93,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			Collections.emptyList(),
 			Collections.emptyList(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
-			ClassLoader.getSystemClassLoader());
+			ClassLoader.getSystemClassLoader(),
+			null);
 
 		executionGraph.enableCheckpointing(
 				100,

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 92613ed..3247024 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -76,8 +76,9 @@ public class TaskDeploymentDescriptorTest {
 			final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot();
 
 			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(
-				serializedJobInformation,
-				serializedJobVertexInformation,
+				jobID,
+				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
 				execId,
 				allocationId,
 				indexInSubtaskGroup,

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index bbc232d..3af4171 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -18,22 +18,8 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertNotEquals;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.time.Time;
@@ -43,6 +29,8 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -52,14 +40,13 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
@@ -72,8 +59,59 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static junit.framework.TestCase.assertNull;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ExecutionGraph} deployment.
+ */
 public class ExecutionGraphDeploymentTest {
 
+	/**
+	 * BLOB server instance to use for the job graph (may be <tt>null</tt>).
+	 */
+	protected BlobServer blobServer = null;
+
+	/**
+	 * Permanent BLOB cache instance to use for the actor gateway that handles the {@link
+	 * TaskDeploymentDescriptor} loading (may be <tt>null</tt>).
+	 */
+	protected PermanentBlobService blobCache = null;
+
+	/**
+	 * Checks that the job information for the given ID has been offloaded successfully (if
+	 * offloading is used).
+	 *
+	 * @param eg           the execution graph that was created
+	 */
+	protected void checkJobOffloaded(ExecutionGraph eg) throws Exception {
+		assertNull(eg.getJobInformationBlobKey());
+	}
+
+	/**
+	 * Checks that the task information for the job vertex has been offloaded successfully (if
+	 * offloading is used).
+	 *
+	 * @param eg           the execution graph that was created
+	 * @param jobVertexId  job vertex ID
+	 */
+	protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
+		assertNull(eg.getJobVertex(jobVertexId).getTaskInformationBlobKey());
+	}
+
 	@Test
 	public void testBuildDeploymentDescriptor() {
 		try {
@@ -106,13 +144,15 @@ public class ExecutionGraphDeploymentTest {
 			ExecutionGraph eg = new ExecutionGraph(
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
-				jobId, 
-				"some job", 
+				jobId,
+				"some job",
 				new Configuration(),
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy(),
-				new Scheduler(TestingUtils.defaultExecutionContext()));
+				new Scheduler(TestingUtils.defaultExecutionContext()),
+				blobServer);
+			checkJobOffloaded(eg);
 
 			List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
@@ -121,7 +161,10 @@ public class ExecutionGraphDeploymentTest {
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
 			ExecutionVertex vertex = ejv.getTaskVertices()[3];
 
-			ExecutionGraphTestUtils.SimpleActorGateway instanceGateway = new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext());
+			ExecutionGraphTestUtils.SimpleActorGatewayWithTDD instanceGateway =
+				new ExecutionGraphTestUtils.SimpleActorGatewayWithTDD(
+					TestingUtils.directExecutionContext(),
+					blobCache == null ? blobServer : blobCache);
 
 			final Instance instance = getInstance(new ActorTaskManagerGateway(instanceGateway));
 
@@ -132,13 +175,17 @@ public class ExecutionGraphDeploymentTest {
 			vertex.deployToSlot(slot);
 
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+			checkTaskOffloaded(eg, vertex.getJobvertexId());
 
 			TaskDeploymentDescriptor descr = instanceGateway.lastTDD;
 			assertNotNull(descr);
 
-			JobInformation jobInformation = descr.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
-			TaskInformation taskInformation = descr.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
+			JobInformation jobInformation =
+				descr.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
+			TaskInformation taskInformation =
+				descr.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
 
+			assertEquals(jobId, descr.getJobId());
 			assertEquals(jobId, jobInformation.getJobId());
 			assertEquals(jid2, taskInformation.getJobVertexId());
 			assertEquals(3, descr.getSubtaskIndex());
@@ -381,7 +428,9 @@ public class ExecutionGraphDeploymentTest {
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
-			scheduler);
+			scheduler,
+			blobServer);
+		checkJobOffloaded(eg);
 
 		eg.setQueuedSchedulingAllowed(false);
 
@@ -448,16 +497,18 @@ public class ExecutionGraphDeploymentTest {
 
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
-				new DirectScheduledExecutorService(),
-				TestingUtils.defaultExecutor(),
-				jobId,
-				"some job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy(),
-				scheduler);
-
+			new DirectScheduledExecutorService(),
+			TestingUtils.defaultExecutor(),
+			jobId, 
+			"some job",
+			new Configuration(),
+			new SerializedValue<>(new ExecutionConfig()),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy(),
+			scheduler,
+			blobServer);
+		checkJobOffloaded(eg);
+		
 		eg.setQueuedSchedulingAllowed(false);
 
 		List<JobVertex> ordered = Arrays.asList(v1, v2);
@@ -537,6 +588,7 @@ public class ExecutionGraphDeploymentTest {
 			new NoRestartStrategy(),
 			new UnregisteredMetricsGroup(),
 			1,
+			blobServer,
 			LoggerFactory.getLogger(getClass()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
new file mode 100644
index 0000000..f1c071e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Tests {@link ExecutionGraph} deployment when offloading job and task information into the BLOB
+ * server.
+ */
+public class ExecutionGraphDeploymentWithBlobCacheTest extends ExecutionGraphDeploymentWithBlobServerTest {
+
+	@Before
+	@Override
+	public void setupBlobServer() throws IOException {
+		Configuration config = new Configuration();
+		// always offload the serialized job and task information
+		config.setInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE, 0);
+		blobServer = new BlobServer(config, new VoidBlobStore());
+		blobServer.start();
+
+		InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
+		blobCache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());
+	}
+
+	@After
+	@Override
+	public void shutdownBlobServer() throws IOException {
+		if (blobServer != null) {
+			blobServer.close();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
new file mode 100644
index 0000000..030e18d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.After;
+import org.junit.Before;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+
+/**
+ * Tests {@link ExecutionGraph} deployment when offloading job and task information into the BLOB
+ * server.
+ */
+public class ExecutionGraphDeploymentWithBlobServerTest extends ExecutionGraphDeploymentTest {
+
+	private Set<byte[]> seenHashes = Collections.newSetFromMap(new ConcurrentHashMap<byte[], Boolean>());
+
+	@Before
+	public void setupBlobServer() throws IOException {
+		Configuration config = new Configuration();
+		// always offload the serialized job and task information
+		config.setInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE, 0);
+		blobServer = Mockito.spy(new BlobServer(config, new VoidBlobStore()));
+
+		seenHashes.clear();
+
+		// verify that we do not upload the same content more than once
+		doAnswer(
+			invocation -> {
+				PermanentBlobKey key = (PermanentBlobKey) invocation.callRealMethod();
+
+				assertTrue(seenHashes.add(key.getHash()));
+
+				return key;
+			}
+		).when(blobServer).putPermanent(any(JobID.class), Matchers.<byte[]>any());
+
+		blobServer.start();
+	}
+
+	@After
+	public void shutdownBlobServer() throws IOException {
+		if (blobServer != null) {
+			blobServer.close();
+		}
+	}
+
+	@Override
+	protected void checkJobOffloaded(ExecutionGraph eg) throws Exception {
+		PermanentBlobKey jobInformationBlobKey = eg.getJobInformationBlobKey();
+		assertNotNull(jobInformationBlobKey);
+
+		// must not throw:
+		blobServer.getFile(eg.getJobID(), jobInformationBlobKey);
+	}
+
+	@Override
+	protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
+		PermanentBlobKey taskInformationBlobKey = eg.getJobVertex(jobVertexId).getTaskInformationBlobKey();
+		assertNotNull(taskInformationBlobKey);
+
+		// must not throw:
+		blobServer.getFile(eg.getJobID(), taskInformationBlobKey);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index b88a928..c8cab9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -555,6 +555,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 			new NoRestartStrategy(),
 			new UnregisteredMetricsGroup(),
 			1,
+			null,
 			log);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 2daf28f..1f9fa82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import akka.actor.Status;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -77,7 +80,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 /**
- * A collection of utility methods for testing the ExecutionGraph and its related classes. 
+ * A collection of utility methods for testing the ExecutionGraph and its related classes.
  */
 public class ExecutionGraphTestUtils {
 
@@ -89,10 +92,10 @@ public class ExecutionGraphTestUtils {
 
 	/**
 	 * Waits until the job has reached a certain state.
-	 * 
+	 *
 	 * <p>This method is based on polling and might miss very fast state transitions!
 	 */
-	public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long maxWaitMillis) 
+	public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long maxWaitMillis)
 			throws TimeoutException {
 
 		checkNotNull(eg);
@@ -103,7 +106,7 @@ public class ExecutionGraphTestUtils {
 		final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000);
 
 		while (eg.getState() != status && System.nanoTime() < deadline) {
-			try { 
+			try {
 				Thread.sleep(2);
 			} catch (InterruptedException ignored) {}
 		}
@@ -280,7 +283,7 @@ public class ExecutionGraphTestUtils {
 
 	/**
 	 * Creates an execution graph containing the given vertices.
-	 * 
+	 *
 	 * <p>The execution graph uses {@link NoRestartStrategy} as the restart strategy.
 	 */
 	public static ExecutionGraph createSimpleTestGraph(JobID jid, JobVertex... vertices) throws Exception {
@@ -339,6 +342,7 @@ public class ExecutionGraphTestUtils {
 				restartStrategy,
 				new UnregisteredMetricsGroup(),
 				1,
+				null,
 				TEST_LOGGER);
 	}
 
@@ -368,8 +372,7 @@ public class ExecutionGraphTestUtils {
 
 	@SuppressWarnings("serial")
 	public static class SimpleActorGateway extends BaseTestingActorGateway {
-		
-		public TaskDeploymentDescriptor lastTDD;
+
 
 		public SimpleActorGateway(ExecutionContext executionContext){
 			super(executionContext);
@@ -379,7 +382,6 @@ public class ExecutionGraphTestUtils {
 		public Object handleMessage(Object message) {
 			if (message instanceof SubmitTask) {
 				SubmitTask submitTask = (SubmitTask) message;
-				lastTDD = submitTask.tasks();
 				return Acknowledge.get();
 			} else if(message instanceof CancelTask) {
 				return Acknowledge.get();
@@ -392,6 +394,35 @@ public class ExecutionGraphTestUtils {
 	}
 
 	@SuppressWarnings("serial")
+	public static class SimpleActorGatewayWithTDD extends SimpleActorGateway {
+
+		public TaskDeploymentDescriptor lastTDD;
+		private final PermanentBlobService blobCache;
+
+		public SimpleActorGatewayWithTDD(ExecutionContext executionContext, PermanentBlobService blobCache) {
+			super(executionContext);
+			this.blobCache = blobCache;
+		}
+
+		@Override
+		public Object handleMessage(Object message) {
+			if(message instanceof SubmitTask) {
+				SubmitTask submitTask = (SubmitTask) message;
+				lastTDD = submitTask.tasks();
+				try {
+					lastTDD.loadBigData(blobCache);
+					return Acknowledge.get();
+				} catch (Exception e) {
+					e.printStackTrace();
+					return new Status.Failure(e);
+				}
+			} else {
+				return super.handleMessage(message);
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
 	public static class SimpleFailingActorGateway extends BaseTestingActorGateway {
 
 		public SimpleFailingActorGateway(ExecutionContext executionContext) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index c9b7a40..2ba4194 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -220,6 +220,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 			new FixedDelayRestartStrategy(10, 0L),
 			new UnregisteredMetricsGroup(),
 			1,
+			null,
 			log);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 6b872ef..d631de9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -141,7 +141,8 @@ public class FailoverRegionTest extends TestLogger {
 				Collections.emptyList(),
 				Collections.emptyList(),
 				slotProvider,
-				ExecutionGraph.class.getClassLoader());
+				ExecutionGraph.class.getClassLoader(),
+				null);
 
 		eg.attachJobGraph(ordered);
 
@@ -268,7 +269,8 @@ public class FailoverRegionTest extends TestLogger {
 				Collections.emptyList(),
 				Collections.emptyList(),
 				scheduler,
-				ExecutionGraph.class.getClassLoader());
+				ExecutionGraph.class.getClassLoader(),
+				null);
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -345,7 +347,8 @@ public class FailoverRegionTest extends TestLogger {
 				Collections.emptyList(),
 				Collections.emptyList(),
 				scheduler,
-				ExecutionGraph.class.getClassLoader());
+				ExecutionGraph.class.getClassLoader(),
+				null);
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -458,7 +461,8 @@ public class FailoverRegionTest extends TestLogger {
 				Collections.emptyList(),
 				Collections.emptyList(),
 				scheduler,
-				ExecutionGraph.class.getClassLoader());
+				ExecutionGraph.class.getClassLoader(),
+				null);
 		try {
 			eg.attachJobGraph(ordered);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index f441970..986fb39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -175,7 +175,8 @@ public class GlobalModVersionTest {
 				Collections.emptyList(),
 				Collections.emptyList(),
 				slotProvider,
-				getClass().getClassLoader());
+				getClass().getClassLoader(),
+				null);
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index c52b59c..9d7cd90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -299,7 +299,8 @@ public class IndividualRestartsConcurrencyTest {
 				Collections.emptyList(),
 				Collections.emptyList(),
 				slotProvider,
-				getClass().getClassLoader());
+				getClass().getClassLoader(),
+				null);
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index 3c58616..a0351fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -321,7 +321,8 @@ public class PipelinedRegionFailoverConcurrencyTest {
 				Collections.emptyList(),
 				Collections.emptyList(),
 				slotProvider,
-				getClass().getClassLoader());
+				getClass().getClassLoader(),
+				null);
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
index 7c4151b..ecbdc46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -106,7 +106,8 @@ public class RestartPipelinedRegionStrategyTest {
             Collections.emptyList(),
             Collections.emptyList(),
             scheduler,
-            ExecutionGraph.class.getClassLoader());
+            ExecutionGraph.class.getClassLoader(),
+			null);
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -191,7 +192,8 @@ public class RestartPipelinedRegionStrategyTest {
             Collections.emptyList(),
             Collections.emptyList(),
             scheduler,
-            ExecutionGraph.class.getClassLoader());
+            ExecutionGraph.class.getClassLoader(),
+			null);
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -281,7 +283,8 @@ public class RestartPipelinedRegionStrategyTest {
             Collections.emptyList(),
             Collections.emptyList(),
             scheduler,
-            ExecutionGraph.class.getClassLoader());
+            ExecutionGraph.class.getClassLoader(),
+			null);
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -362,7 +365,8 @@ public class RestartPipelinedRegionStrategyTest {
             Collections.emptyList(),
             Collections.emptyList(),
             scheduler,
-            ExecutionGraph.class.getClassLoader());
+            ExecutionGraph.class.getClassLoader(),
+			null);
 		try {
 			eg.attachJobGraph(ordered);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
index 55bf711..5e96dfd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
@@ -639,6 +639,7 @@ public class PipelinedFailoverRegionBuildingTest extends TestLogger {
 				new NoRestartStrategy(),
 				new UnregisteredMetricsGroup(),
 				1000,
+				null,
 				log);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 0f21b55..2946d5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -91,22 +92,23 @@ public class JobMasterTest extends TestLogger {
 
 		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
-		BlobServer blobServer = mock(BlobServer.class);
-		when(blobServer.getPort()).thenReturn(1337);
 
 		final JobGraph jobGraph = new JobGraph();
 
-		try {
+		Configuration configuration = new Configuration();
+		try (BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore())) {
+			blobServer.start();
+
 			final JobMaster jobMaster = new JobMaster(
 				rpc,
 				jmResourceId,
 				jobGraph,
-				new Configuration(),
+				configuration,
 				haServices,
 				heartbeatServices,
 				Executors.newScheduledThreadPool(1),
 				blobServer,
-				mock(BlobLibraryCacheManager.class),
+				new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
 				mock(RestartStrategyFactory.class),
 				testingTimeout,
 				null,
@@ -197,17 +199,20 @@ public class JobMasterTest extends TestLogger {
 
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
-		try {
+		Configuration configuration = new Configuration();
+		try (BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore())) {
+			blobServer.start();
+
 			final JobMaster jobMaster = new JobMaster(
 				rpc,
 				jmResourceId,
 				jobGraph,
-				new Configuration(),
+				configuration,
 				haServices,
 				heartbeatServices,
 				Executors.newScheduledThreadPool(1),
-				mock(BlobServer.class),
-				mock(BlobLibraryCacheManager.class),
+				blobServer,
+				new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
 				mock(RestartStrategyFactory.class),
 				testingTimeout,
 				null,

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9df74e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 5b80dd5..e106238 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -680,8 +680,9 @@ public class TaskExecutorTest extends TestLogger {
 		SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
 
 		final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-				serializedJobInformation,
-				serializedJobVertexInformation,
+				jobId,
+				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
 				new ExecutionAttemptID(),
 				allocationId,
 				0,
@@ -1286,8 +1287,9 @@ public class TaskExecutorTest extends TestLogger {
 			SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
 
 			final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-				serializedJobInformation,
-				serializedJobVertexInformation,
+				jobId,
+				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
 				new ExecutionAttemptID(),
 				allocationId1,
 				0,