You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/24 16:09:39 UTC

[6/6] flink git commit: [FLINK-6046] Introduce BlobWriter interface to abstract BlobServer from ExecutionGraph

[FLINK-6046] Introduce BlobWriter interface to abstract BlobServer from ExecutionGraph


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ebe3fb5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ebe3fb5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ebe3fb5

Branch: refs/heads/master
Commit: 5ebe3fb55f2ce9299660989d40aa6d96d45a6d8c
Parents: 5ff07e6
Author: Till Rohrmann <tr...@apache.org>
Authored: Sat Oct 21 12:31:21 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 24 18:08:31 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobServer.java   |  70 +----------
 .../apache/flink/runtime/blob/BlobWriter.java   | 115 +++++++++++++++++++
 .../flink/runtime/blob/VoidBlobWriter.java      |  55 +++++++++
 .../runtime/executiongraph/ExecutionGraph.java  |  27 +++--
 .../executiongraph/ExecutionGraphBuilder.java   |   6 +-
 .../executiongraph/ExecutionJobVertex.java      |   8 +-
 .../CheckpointSettingsSerializableTest.java     |  29 ++---
 ...ExecutionGraphCheckpointCoordinatorTest.java |   3 +-
 .../ExecutionGraphDeploymentTest.java           |  17 +--
 ...ecutionGraphDeploymentWithBlobCacheTest.java |   1 +
 ...cutionGraphDeploymentWithBlobServerTest.java |   4 +
 .../ExecutionGraphSchedulingTest.java           |   3 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  29 ++---
 .../ExecutionVertexLocalityTest.java            |   3 +-
 .../PipelinedRegionFailoverConcurrencyTest.java |   3 +-
 .../RestartPipelinedRegionStrategyTest.java     |   9 +-
 .../PipelinedFailoverRegionBuildingTest.java    |  29 ++---
 .../partitioner/RescalePartitionerTest.java     |   3 +-
 18 files changed, 268 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 0f6b350..01fb808 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -24,11 +24,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,7 +66,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store
  * the BLOBs or temporarily cache them.
  */
-public class BlobServer extends Thread implements BlobService, PermanentBlobService, TransientBlobService {
+public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {
 
 	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
@@ -565,41 +563,13 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 		return (TransientBlobKey) putInputStream(jobId, inputStream, TRANSIENT_BLOB);
 	}
 
-	/**
-	 * Uploads the data of the given byte array for the given job to the BLOB server and makes it
-	 * a permanent BLOB.
-	 *
-	 * @param jobId
-	 * 		the ID of the job the BLOB belongs to
-	 * @param value
-	 * 		the buffer to upload
-	 *
-	 * @return the computed BLOB key identifying the BLOB on the server
-	 *
-	 * @throws IOException
-	 * 		thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
-	 * 		store
-	 */
+	@Override
 	public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
 		checkNotNull(jobId);
 		return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB);
 	}
 
-	/**
-	 * Uploads the data from the given input stream for the given job to the BLOB server and makes it
-	 * a permanent BLOB.
-	 *
-	 * @param jobId
-	 * 		ID of the job this blob belongs to
-	 * @param inputStream
-	 * 		the input stream to read the data from
-	 *
-	 * @return the computed BLOB key identifying the BLOB on the server
-	 *
-	 * @throws IOException
-	 * 		thrown if an I/O error occurs while reading the data from the input stream, writing it to a
-	 * 		local file, or uploading it to the HA store
-	 */
+	@Override
 	public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
 		checkNotNull(jobId);
 		return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
@@ -886,6 +856,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 *
 	 * @return configuration
 	 */
+	@Override
 	public final int getMinOffloadingSize() {
 		return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
 	}
@@ -943,37 +914,4 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 			return new ArrayList<>(activeConnections);
 		}
 	}
-
-	/**
-	 * Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
-	 * offloading size of the BlobServer.
-	 *
-	 * @param value to serialize
-	 * @param jobId to which the value belongs.
-	 * @param blobServer
-	 * @param <T>
-	 * @return
-	 * @throws IOException
-	 */
-	public static <T> Either<SerializedValue<T>, PermanentBlobKey> tryOffload(
-		T value,
-		JobID jobId,
-		@Nullable BlobServer blobServer) throws IOException {
-
-		final SerializedValue<T> serializedValue = new SerializedValue<>(value);
-
-		if (blobServer == null || serializedValue.getByteArray().length < blobServer.getMinOffloadingSize()) {
-			return Either.Left(new SerializedValue<>(value));
-		} else {
-			try {
-				final PermanentBlobKey permanentBlobKey = blobServer.putPermanent(jobId, serializedValue.getByteArray());
-
-				return Either.Right(permanentBlobKey);
-			} catch (IOException e) {
-				LOG.warn("Failed to offload value " + value + " for job " + jobId + " to BLOB store.", e);
-
-				return Either.Left(serializedValue);
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
new file mode 100644
index 0000000..cc3bd1d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * BlobWriter is used to upload data to the BLOB store.
+ */
+public interface BlobWriter {
+
+	Logger LOG = LoggerFactory.getLogger(BlobWriter.class);
+
+	/**
+	 * Uploads the data of the given byte array for the given job to the BLOB server and makes it
+	 * a permanent BLOB.
+	 *
+	 * @param jobId
+	 * 		the ID of the job the BLOB belongs to
+	 * @param value
+	 * 		the buffer to upload
+	 *
+	 * @return the computed BLOB key identifying the BLOB on the server
+	 *
+	 * @throws IOException
+	 * 		thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
+	 * 		store
+	 */
+	PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException;
+
+	/**
+	 * Uploads the data from the given input stream for the given job to the BLOB server and makes it
+	 * a permanent BLOB.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 * @param inputStream
+	 * 		the input stream to read the data from
+	 *
+	 * @return the computed BLOB key identifying the BLOB on the server
+	 *
+	 * @throws IOException
+	 * 		thrown if an I/O error occurs while reading the data from the input stream, writing it to a
+	 * 		local file, or uploading it to the HA store
+	 */
+	PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException;
+
+	/**
+	 * Returns the min size before data will be offloaded to the BLOB store.
+	 *
+	 * @return minimum offloading size
+	 */
+	int getMinOffloadingSize();
+
+	/**
+	 * Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
+	 * offloading size of the BlobServer.
+	 *
+	 * @param value to serialize
+	 * @param jobId to which the value belongs.
+	 * @param blobWriter to use to offload the serialized value
+	 * @param <T> type of the value to serialize
+	 * @return Either the serialized value or the stored blob key
+	 * @throws IOException if the data cannot be serialized
+	 */
+	static <T> Either<SerializedValue<T>, PermanentBlobKey> serializeAndTryOffload(
+			T value,
+			JobID jobId,
+			BlobWriter blobWriter) throws IOException {
+		Preconditions.checkNotNull(value);
+		Preconditions.checkNotNull(jobId);
+		Preconditions.checkNotNull(blobWriter);
+
+		final SerializedValue<T> serializedValue = new SerializedValue<>(value);
+
+		if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
+			return Either.Left(new SerializedValue<>(value));
+		} else {
+			try {
+				final PermanentBlobKey permanentBlobKey = blobWriter.putPermanent(jobId, serializedValue.getByteArray());
+
+				return Either.Right(permanentBlobKey);
+			} catch (IOException e) {
+				LOG.warn("Failed to offload value {} for job {} to BLOB store.", value, jobId, e);
+
+				return Either.Left(serializedValue);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobWriter.java
new file mode 100644
index 0000000..7830437
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobWriter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * BlobWriter which does not support writing BLOBs to a store. This class is
+ * mainly used for testing purposes where we don't want to store data in the
+ * BLOB store.
+ */
+@VisibleForTesting
+public class VoidBlobWriter implements BlobWriter {
+
+	private static final VoidBlobWriter INSTANCE = new VoidBlobWriter();
+
+	@Override
+	public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
+		throw new IOException("The VoidBlobWriter cannot write data to the BLOB store.");
+	}
+
+	@Override
+	public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
+		throw new IOException("The VoidBlobWriter cannot write data to the BLOB store.");
+	}
+
+	@Override
+	public int getMinOffloadingSize() {
+		return Integer.MAX_VALUE;
+	}
+
+	public static VoidBlobWriter getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 74a68ab..f002c5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -31,8 +31,9 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -76,8 +77,6 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -174,6 +173,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Job specific information like the job id, job name, job configuration, etc. */
 	private final JobInformation jobInformation;
 
+	/** Serialized job information or a blob key pointing to the offloaded job information */
 	private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
 
 	/** The executor which is used to execute futures. */
@@ -232,6 +232,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Registered KvState instances reported by the TaskManagers. */
 	private final KvStateLocationRegistry kvStateLocationRegistry;
 
+	/** Blob writer used to offload RPC messages */
+	private final BlobWriter blobWriter;
+
 	/** The total number of vertices currently in the execution graph */
 	private int numVerticesTotal;
 
@@ -276,9 +279,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
 
-	@Nullable
-	private BlobServer blobServer;
-
 	// --------------------------------------------------------------------------------------------
 	//   Constructors
 	// --------------------------------------------------------------------------------------------
@@ -352,7 +352,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			failoverStrategy,
 			slotProvider,
 			ExecutionGraph.class.getClassLoader(),
-			null);
+			VoidBlobWriter.getInstance());
 	}
 
 	public ExecutionGraph(
@@ -364,13 +364,15 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			FailoverStrategy.Factory failoverStrategyFactory,
 			SlotProvider slotProvider,
 			ClassLoader userClassLoader,
-			@Nullable BlobServer blobServer) throws IOException {
+			BlobWriter blobWriter) throws IOException {
 
 		checkNotNull(futureExecutor);
 
 		this.jobInformation = Preconditions.checkNotNull(jobInformation);
 
-		this.jobInformationOrBlobKey = BlobServer.tryOffload(jobInformation, jobInformation.getJobId(), blobServer);
+		this.blobWriter = Preconditions.checkNotNull(blobWriter);
+
+		this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);
 
 		this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
 		this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
@@ -403,8 +405,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		// is ready by the time the failover strategy sees it
 		this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
 		LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
-
-		this.blobServer = blobServer;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -705,9 +705,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return this.stateTimestamps[status.ordinal()];
 	}
 
-	@Nullable
-	public final BlobServer getBlobServer() {
-		return blobServer;
+	public final BlobWriter getBlobWriter() {
+		return blobWriter;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 8d48432..2a4315d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -27,7 +27,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -91,7 +91,7 @@ public class ExecutionGraphBuilder {
 			RestartStrategy restartStrategy,
 			MetricGroup metrics,
 			int parallelismForAutoMax,
-			BlobServer blobServer,
+			BlobWriter blobWriter,
 			Logger log)
 		throws JobExecutionException, JobException {
 
@@ -124,7 +124,7 @@ public class ExecutionGraphBuilder {
                     failoverStrategy,
                     slotProvider,
                     classLoader,
-                    blobServer);
+                    blobWriter);
 		} catch (IOException e) {
 			throw new JobException("Could not create the ExecutionGraph.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 9adaf45..90224b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,7 +32,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -368,7 +368,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		// serialize the task information!
 		synchronized (stateMonitor) {
 			if (taskInformationOrBlobKey == null) {
-				final BlobServer blobServer = graph.getBlobServer();
+				final BlobWriter blobWriter = graph.getBlobWriter();
 
 				final TaskInformation taskInformation = new TaskInformation(
 					jobVertex.getID(),
@@ -378,10 +378,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 					jobVertex.getInvokableClassName(),
 					jobVertex.getConfiguration());
 
-				taskInformationOrBlobKey = BlobServer.tryOffload(
+				taskInformationOrBlobKey = BlobWriter.serializeAndTryOffload(
 					taskInformation,
 					getJobId(),
-					blobServer);
+					blobWriter);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index e500036..7e85167 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
@@ -95,20 +96,20 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 		final JobGraph copy = CommonTestUtils.createCopySerializable(jobGraph);
 
 		final ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
-				null,
-				copy,
-				new Configuration(),
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				mock(SlotProvider.class),
-				classLoader,
-				new StandaloneCheckpointRecoveryFactory(),
-				Time.seconds(10),
-				new NoRestartStrategy(),
-				new UnregisteredMetricsGroup(),
-				10,
-				null,
-				log);
+			null,
+			copy,
+			new Configuration(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			mock(SlotProvider.class),
+			classLoader,
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(10),
+			new NoRestartStrategy(),
+			new UnregisteredMetricsGroup(),
+			10,
+			VoidBlobWriter.getInstance(),
+			log);
 
 		assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
 		assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 1489f1a..5893d1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.executiongraph.DummyJobInformation;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
@@ -86,7 +87,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			new RestartAllStrategy.Factory(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
 			ClassLoader.getSystemClassLoader(),
-			null);
+			VoidBlobWriter.getInstance());
 
 		executionGraph.enableCheckpointing(
 				100,

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index b9ca508..5c80405 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -28,8 +28,9 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -81,9 +82,9 @@ import static org.junit.Assert.fail;
 public class ExecutionGraphDeploymentTest extends TestLogger {
 
 	/**
-	 * BLOB server instance to use for the job graph (may be <tt>null</tt>).
+	 * BLOB server instance to use for the job graph.
 	 */
-	protected BlobServer blobServer = null;
+	protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
 
 	/**
 	 * Permanent BLOB cache instance to use for the actor gateway that handles the {@link
@@ -154,7 +155,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 				new RestartAllStrategy.Factory(),
 				new Scheduler(TestingUtils.defaultExecutionContext()),
 				ExecutionGraph.class.getClassLoader(),
-				blobServer);
+				blobWriter);
 
 			checkJobOffloaded(eg);
 
@@ -168,7 +169,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			ExecutionGraphTestUtils.SimpleActorGatewayWithTDD instanceGateway =
 				new ExecutionGraphTestUtils.SimpleActorGatewayWithTDD(
 					TestingUtils.directExecutionContext(),
-					blobCache == null ? blobServer : blobCache);
+					blobCache);
 
 			final Instance instance = getInstance(new ActorTaskManagerGateway(instanceGateway));
 
@@ -436,7 +437,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			new RestartAllStrategy.Factory(),
 			scheduler,
 			ExecutionGraph.class.getClassLoader(),
-			blobServer);
+			blobWriter);
 
 		checkJobOffloaded(eg);
 
@@ -517,7 +518,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			new RestartAllStrategy.Factory(),
 			scheduler,
 			ExecutionGraph.class.getClassLoader(),
-			blobServer);
+			blobWriter);
 		checkJobOffloaded(eg);
 		
 		eg.setQueuedSchedulingAllowed(false);
@@ -599,7 +600,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			new NoRestartStrategy(),
 			new UnregisteredMetricsGroup(),
 			1,
-			blobServer,
+			blobWriter,
 			LoggerFactory.getLogger(getClass()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
index 0fcf8c5..25d218e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
@@ -44,6 +44,7 @@ public class ExecutionGraphDeploymentWithBlobCacheTest extends ExecutionGraphDep
 		config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
 		blobServer = new BlobServer(config, new VoidBlobStore());
 		blobServer.start();
+		blobWriter = blobServer;
 
 		InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
 		blobCache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
index 59d8bc2..232a259 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
@@ -50,12 +50,16 @@ public class ExecutionGraphDeploymentWithBlobServerTest extends ExecutionGraphDe
 
 	private Set<byte[]> seenHashes = Collections.newSetFromMap(new ConcurrentHashMap<byte[], Boolean>());
 
+	protected BlobServer blobServer = null;
+
 	@Before
 	public void setupBlobServer() throws IOException {
 		Configuration config = new Configuration();
 		// always offload the serialized job and task information
 		config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
 		blobServer = Mockito.spy(new BlobServer(config, new VoidBlobStore()));
+		blobWriter = blobServer;
+		blobCache = blobServer;
 
 		seenHashes.clear();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index c8cab9f..b90c306 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -555,7 +556,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 			new NoRestartStrategy(),
 			new UnregisteredMetricsGroup(),
 			1,
-			null,
+			VoidBlobWriter.getInstance(),
 			log);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 1f9fa82..017e85f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -330,20 +331,20 @@ public class ExecutionGraphTestUtils {
 		checkNotNull(vertices);
 
 		return ExecutionGraphBuilder.buildGraph(
-				null,
-				new JobGraph(jid, "test job", vertices),
-				new Configuration(),
-				executor,
-				executor,
-				slotProvider,
-				ExecutionGraphTestUtils.class.getClassLoader(),
-				new StandaloneCheckpointRecoveryFactory(),
-				Time.seconds(10),
-				restartStrategy,
-				new UnregisteredMetricsGroup(),
-				1,
-				null,
-				TEST_LOGGER);
+			null,
+			new JobGraph(jid, "test job", vertices),
+			new Configuration(),
+			executor,
+			executor,
+			slotProvider,
+			ExecutionGraphTestUtils.class.getClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(10),
+			restartStrategy,
+			new UnregisteredMetricsGroup(),
+			1,
+			VoidBlobWriter.getInstance(),
+			TEST_LOGGER);
 	}
 
 	public static JobVertex createNoOpVertex(int parallelism) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 2ba4194..5f12646 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -220,7 +221,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 			new FixedDelayRestartStrategy(10, 0L),
 			new UnregisteredMetricsGroup(),
 			1,
-			null,
+			VoidBlobWriter.getInstance(),
 			log);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index c78e193..ac34c62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -317,7 +318,7 @@ public class PipelinedRegionFailoverConcurrencyTest {
 			failoverStrategy,
 			slotProvider,
 			getClass().getClassLoader(),
-			null);
+			VoidBlobWriter.getInstance());
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
index 8198df5..059b8a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -102,7 +103,7 @@ public class RestartPipelinedRegionStrategyTest {
             new RestartPipelinedRegionStrategy.Factory(),
             scheduler,
             ExecutionGraph.class.getClassLoader(),
-			null);
+			VoidBlobWriter.getInstance());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -186,7 +187,7 @@ public class RestartPipelinedRegionStrategyTest {
             new RestartPipelinedRegionStrategy.Factory(),
             scheduler,
             ExecutionGraph.class.getClassLoader(),
-			null);
+			VoidBlobWriter.getInstance());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -275,7 +276,7 @@ public class RestartPipelinedRegionStrategyTest {
             new RestartPipelinedRegionStrategy.Factory(),
             scheduler,
             ExecutionGraph.class.getClassLoader(),
-			null);
+			VoidBlobWriter.getInstance());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -355,7 +356,7 @@ public class RestartPipelinedRegionStrategyTest {
             new RestartPipelinedRegionStrategy.Factory(),
             scheduler,
             ExecutionGraph.class.getClassLoader(),
-			null);
+			VoidBlobWriter.getInstance());
 		try {
 			eg.attachJobGraph(ordered);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
index 5e96dfd..4709bce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -627,19 +628,19 @@ public class PipelinedFailoverRegionBuildingTest extends TestLogger {
 				FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME);
 
 		return ExecutionGraphBuilder.buildGraph(
-				null,
-				jobGraph,
-				jobManagerConfig,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				mock(SlotProvider.class),
-				PipelinedFailoverRegionBuildingTest.class.getClassLoader(),
-				new StandaloneCheckpointRecoveryFactory(),
-				Time.seconds(10),
-				new NoRestartStrategy(),
-				new UnregisteredMetricsGroup(),
-				1000,
-				null,
-				log);
+			null,
+			jobGraph,
+			jobManagerConfig,
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			mock(SlotProvider.class),
+			PipelinedFailoverRegionBuildingTest.class.getClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(10),
+			new NoRestartStrategy(),
+			new UnregisteredMetricsGroup(),
+			1000,
+			VoidBlobWriter.getInstance(),
+			log);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index b2e63be..368dfee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.executiongraph.DummyJobInformation;
 import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -154,7 +155,7 @@ public class RescalePartitionerTest extends TestLogger {
 			new RestartAllStrategy.Factory(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
 			ExecutionGraph.class.getClassLoader(),
-			null);
+			VoidBlobWriter.getInstance());
 		try {
 			eg.attachJobGraph(jobVertices);
 		}