You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/11 17:45:30 UTC

[1/4] flink git commit: [FLINK-7017] Remove netty usages in flink-tests

Repository: flink
Updated Branches:
  refs/heads/master 211d0963e -> 021d27d54


[FLINK-7017] Remove netty usages in flink-tests

This closes #4196.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed3b3266
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed3b3266
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed3b3266

Branch: refs/heads/master
Commit: ed3b3266cdb4811ffb3539640c57354ae71adc24
Parents: 748eba1
Author: zentol <ch...@apache.org>
Authored: Tue Jun 27 15:08:50 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 11 18:19:25 2017 +0200

----------------------------------------------------------------------
 flink-runtime-web/pom.xml                       |   7 +
 .../runtime/webmonitor/WebFrontendITCase.java   | 290 ++++++++++++++++++
 .../test/checkpointing/RescalingITCase.java     |   3 +-
 .../flink/test/web/WebFrontendITCase.java       | 300 -------------------
 4 files changed, 299 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed3b3266/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 040e844..988cd76 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -83,6 +83,13 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.curator</groupId>
 			<artifactId>curator-test</artifactId>
 			<version>${curator.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/ed3b3266/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
new file mode 100644
index 0000000..711bbdb
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.commons.io.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the WebFrontend.
+ */
+public class WebFrontendITCase extends TestLogger {
+
+	private static final int NUM_TASK_MANAGERS = 2;
+	private static final int NUM_SLOTS = 4;
+
+	private static LocalFlinkMiniCluster cluster;
+
+	private static int port = -1;
+
+	@BeforeClass
+	public static void initialize() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+
+		File logDir = File.createTempFile("TestBaseUtils-logdir", null);
+		assertTrue("Unable to delete temp file", logDir.delete());
+		assertTrue("Unable to create temp directory", logDir.mkdir());
+		File logFile = new File(logDir, "jobmanager.log");
+		File outFile = new File(logDir, "jobmanager.out");
+
+		Files.createFile(logFile.toPath());
+		Files.createFile(outFile.toPath());
+
+		config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath());
+		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
+
+		cluster = new LocalFlinkMiniCluster(config, false);
+		cluster.start();
+
+		port = cluster.webMonitor().get().getServerPort();
+	}
+
+	@Test
+	public void getFrontPage() {
+		try {
+			String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/index.html");
+			String text = "Apache Flink Dashboard";
+			assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void getNumberOfTaskManagers() {
+		try {
+			String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+
+			ObjectMapper mapper = new ObjectMapper();
+			JsonNode response = mapper.readTree(json);
+			ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
+
+			assertNotNull(taskManagers);
+			assertEquals(cluster.numTaskManagers(), taskManagers.size());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void getTaskmanagers() {
+		try {
+			String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+
+			ObjectMapper mapper = new ObjectMapper();
+			JsonNode parsed = mapper.readTree(json);
+			ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
+
+			assertNotNull(taskManagers);
+			assertEquals(cluster.numTaskManagers(), taskManagers.size());
+
+			JsonNode taskManager = taskManagers.get(0);
+			assertNotNull(taskManager);
+			assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt());
+			assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void getLogAndStdoutFiles() throws Exception {
+		WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+
+		FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
+		String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/log");
+		assertTrue(logs.contains("job manager log"));
+
+		FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
+		logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
+		assertTrue(logs.contains("job manager out"));
+	}
+
+	@Test
+	public void getTaskManagerLogAndStdoutFiles() {
+		try {
+			String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+
+			ObjectMapper mapper = new ObjectMapper();
+			JsonNode parsed = mapper.readTree(json);
+			ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
+			JsonNode taskManager = taskManagers.get(0);
+			String id = taskManager.get("id").asText();
+
+			WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+
+			//we check for job manager log files, since no separate taskmanager logs exist
+			FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
+			String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
+			assertTrue(logs.contains("job manager log"));
+
+			FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
+			logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
+			assertTrue(logs.contains("job manager out"));
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void getConfiguration() {
+		try {
+			String config = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/config");
+
+			Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
+			assertEquals(
+				cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
+				conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testStop() throws Exception {
+		// this only works if there is no active job at this point
+		assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+
+		// Create a task
+		final JobVertex sender = new JobVertex("Sender");
+		sender.setParallelism(2);
+		sender.setInvokableClass(StoppableInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
+		final JobID jid = jobGraph.getJobID();
+
+		cluster.submitJobDetached(jobGraph);
+
+		// wait for job to show up
+		while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+			Thread.sleep(10);
+		}
+
+		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
+		final Deadline deadline = testTimeout.fromNow();
+
+		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+			try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+				// Request the file from the web server
+				client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
+				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
+
+				assertEquals(HttpResponseStatus.OK, response.getStatus());
+				assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
+				assertEquals("{}", response.getContent());
+			}
+
+			Thread.sleep(20);
+		}
+
+		// ensure we can access job details when its finished (FLINK-4011)
+		try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+			FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+			client.sendGetRequest("/jobs/" + jid + "/config", timeout);
+			HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout);
+
+			assertEquals(HttpResponseStatus.OK, response.getStatus());
+			assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
+			assertEquals("{\"jid\":\"" + jid + "\",\"name\":\"Stoppable streaming test job\"," +
+				"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
+				"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
+		}
+	}
+
+	@Test
+	public void testStopYarn() throws Exception {
+		// this only works if there is no active job at this point
+		assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+
+		// Create a task
+		final JobVertex sender = new JobVertex("Sender");
+		sender.setParallelism(2);
+		sender.setInvokableClass(StoppableInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
+		final JobID jid = jobGraph.getJobID();
+
+		cluster.submitJobDetached(jobGraph);
+
+		// wait for job to show up
+		while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+			Thread.sleep(10);
+		}
+
+		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
+		final Deadline deadline = testTimeout.fromNow();
+
+		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+			try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+				// Request the file from the web server
+				client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
+
+				HttpTestClient.SimpleHttpResponse response = client
+					.getNextResponse(deadline.timeLeft());
+
+				assertEquals(HttpResponseStatus.OK, response.getStatus());
+				assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
+				assertEquals("{}", response.getContent());
+			}
+
+			Thread.sleep(20);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed3b3266/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index e934c27..264b22e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -70,6 +70,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -866,7 +867,7 @@ public class RescalingITCase extends TestLogger {
 
 	private static class CollectionSink<IN> implements SinkFunction<IN> {
 
-		private static ConcurrentSet<Object> elements = new ConcurrentSet<Object>();
+		private static Set<Object> elements = Collections.newSetFromMap(new ConcurrentHashMap<Object, Boolean>());
 
 		private static final long serialVersionUID = -1652452958040267745L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed3b3266/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
deleted file mode 100644
index 538ac98..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.web;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-
-import org.apache.commons.io.FileUtils;
-
-import org.apache.commons.math3.optim.nonlinear.vector.JacobianMultivariateVectorOptimizer;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.JobManagerCliOptions;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testutils.StoppableInvokable;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.files.MimeTypes;
-import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
-import org.apache.flink.test.util.TestBaseUtils;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import io.netty.handler.codec.http.HttpResponseStatus;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import static org.apache.flink.test.util.TestBaseUtils.getFromHTTP;
-
-public class WebFrontendITCase extends TestLogger {
-
-	private static final int NUM_TASK_MANAGERS = 2;
-	private static final int NUM_SLOTS = 4;
-	
-	private static LocalFlinkMiniCluster cluster;
-
-	private static int port = -1;
-	
-	@BeforeClass
-	public static void initialize() throws Exception {
-		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
-
-		File logDir = File.createTempFile("TestBaseUtils-logdir", null);
-		assertTrue("Unable to delete temp file", logDir.delete());
-		assertTrue("Unable to create temp directory", logDir.mkdir());
-		File logFile = new File(logDir, "jobmanager.log");
-		File outFile = new File(logDir, "jobmanager.out");
-		
-		Files.createFile(logFile.toPath());
-		Files.createFile(outFile.toPath());
-		
-		config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath());
-		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
-
-		cluster = new LocalFlinkMiniCluster(config, false);
-		cluster.start();
-		
-		port = cluster.webMonitor().get().getServerPort();
-	}
-
-	@Test
-	public void getFrontPage() {
-		try {
-			String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/index.html");
-			String text = "Apache Flink Dashboard";
-			assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void getNumberOfTaskManagers() {
-		try {
-			String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
-
-			ObjectMapper mapper = new ObjectMapper();
-			JsonNode response = mapper.readTree(json);
-			ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
-
-			assertNotNull(taskManagers);
-			assertEquals(cluster.numTaskManagers(), taskManagers.size());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void getTaskmanagers() {
-		try {
-			String json = getFromHTTP("http://localhost:" + port + "/taskmanagers/");
-
-			ObjectMapper mapper = new ObjectMapper();
-			JsonNode parsed = mapper.readTree(json);
-			ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
-
-			assertNotNull(taskManagers);
-			assertEquals(cluster.numTaskManagers(), taskManagers.size());
-
-			JsonNode taskManager = taskManagers.get(0);
-			assertNotNull(taskManager);
-			assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt());
-			assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS);
-		}
-		catch(Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void getLogAndStdoutFiles() throws Exception {
-		WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
-
-		FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
-		String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log");
-		assertTrue(logs.contains("job manager log"));
-
-		FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
-		logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
-		assertTrue(logs.contains("job manager out"));
-	}
-
-	@Test
-	public void getTaskManagerLogAndStdoutFiles() {
-		try {
-			String json = getFromHTTP("http://localhost:" + port + "/taskmanagers/");
-
-			ObjectMapper mapper = new ObjectMapper();
-			JsonNode parsed = mapper.readTree(json);
-			ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
-			JsonNode taskManager = taskManagers.get(0);
-			String id = taskManager.get("id").asText();
-			
-			WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
-			
-			//we check for job manager log files, since no separate taskmanager logs exist
-			FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
-			String logs = getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
-			assertTrue(logs.contains("job manager log"));
-
-			FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
-			logs = getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
-			assertTrue(logs.contains("job manager out"));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void getConfiguration() {
-		try {
-			String config = getFromHTTP("http://localhost:" + port + "/jobmanager/config");
-
-			Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
-			assertEquals(
-				cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
-				conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testStop() throws Exception {
-		// this only works if there is no active job at this point
-		assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
-		
-		// Create a task
-		final JobVertex sender = new JobVertex("Sender");
-		sender.setParallelism(2);
-		sender.setInvokableClass(StoppableInvokable.class);
-
-		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
-		final JobID jid = jobGraph.getJobID();
-
-		cluster.submitJobDetached(jobGraph);
-
-		// wait for job to show up
-		while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
-			Thread.sleep(10);
-		}
-		
-		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
-		final Deadline deadline = testTimeout.fromNow();
-
-		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
-			try (HttpTestClient client = new HttpTestClient("localhost", port)) {
-				// Request the file from the web server
-				client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
-				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
-	
-				assertEquals(HttpResponseStatus.OK, response.getStatus());
-				assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
-				assertEquals("{}", response.getContent());
-			}
-
-			Thread.sleep(20);
-		}
-
-		// ensure we can access job details when its finished (FLINK-4011)
-		try (HttpTestClient client = new HttpTestClient("localhost", port)) {
-			FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
-			client.sendGetRequest("/jobs/" + jid + "/config", timeout);
-			HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout);
-
-			assertEquals(HttpResponseStatus.OK, response.getStatus());
-			assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
-			assertEquals("{\"jid\":\""+jid+"\",\"name\":\"Stoppable streaming test job\"," +
-					"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
-					"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
-		}
-	}
-
-	@Test
-	public void testStopYarn() throws Exception {
-		// this only works if there is no active job at this point
-		assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
-		
-		// Create a task
-		final JobVertex sender = new JobVertex("Sender");
-		sender.setParallelism(2);
-		sender.setInvokableClass(StoppableInvokable.class);
-
-		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
-		final JobID jid = jobGraph.getJobID();
-
-		cluster.submitJobDetached(jobGraph);
-
-		// wait for job to show up
-		while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
-			Thread.sleep(10);
-		}
-
-		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
-		final Deadline deadline = testTimeout.fromNow();
-
-		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
-			try (HttpTestClient client = new HttpTestClient("localhost", port)) {
-				// Request the file from the web server
-				client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
-	
-				HttpTestClient.SimpleHttpResponse response = client
-						.getNextResponse(deadline.timeLeft());
-	
-				assertEquals(HttpResponseStatus.OK, response.getStatus());
-				assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
-				assertEquals("{}", response.getContent());
-			}
-			
-			Thread.sleep(20);
-		}
-	}
-}


[3/4] flink git commit: [FLINK-7052][blob] remove unused NAME_ADDRESSABLE mode

Posted by ch...@apache.org.
[FLINK-7052][blob] remove unused NAME_ADDRESSABLE mode

This closes #4158.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f68856af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f68856af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f68856af

Branch: refs/heads/master
Commit: f68856afb0628adcc7df223ac520df6abdf716cb
Parents: 211d096
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Jun 16 10:51:04 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 11 18:19:25 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobClient.java   | 252 ++-----------------
 .../apache/flink/runtime/blob/BlobServer.java   |  27 --
 .../runtime/blob/BlobServerConnection.java      | 217 +++-------------
 .../flink/runtime/blob/BlobServerProtocol.java  |  16 +-
 .../apache/flink/runtime/blob/BlobStore.java    |  22 --
 .../apache/flink/runtime/blob/BlobUtils.java    |  80 +-----
 .../org/apache/flink/runtime/blob/BlobView.java |  10 -
 .../flink/runtime/blob/FileSystemBlobStore.java |  15 --
 .../flink/runtime/blob/VoidBlobStore.java       |  11 -
 .../flink/runtime/blob/BlobClientSslTest.java   |  49 ----
 .../flink/runtime/blob/BlobClientTest.java      |  96 -------
 .../flink/runtime/blob/BlobRecoveryITCase.java  |  26 --
 .../runtime/blob/BlobServerDeleteTest.java      | 213 +---------------
 .../flink/runtime/blob/BlobServerPutTest.java   | 133 +---------
 .../BlobLibraryCacheManagerTest.java            |  13 +-
 15 files changed, 83 insertions(+), 1097 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 0f02e75..ce59d75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -55,9 +55,6 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
@@ -139,46 +136,6 @@ public final class BlobClient implements Closeable {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Downloads the BLOB identified by the given job ID and key from the BLOB server. If no such BLOB exists on the
-	 * server, a {@link FileNotFoundException} is thrown.
-	 * 
-	 * @param jobID
-	 *        the job ID identifying the BLOB to download
-	 * @param key
-	 *        the key identifying the BLOB to download
-	 * @return an input stream to read the retrieved data from
-	 * @throws IOException
-	 *         thrown if an I/O error occurs during the download
-	 */
-	public InputStream get(JobID jobID, String key) throws IOException {
-		if (key.length() > MAX_KEY_LENGTH) {
-			throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
-		}
-
-		if (this.socket.isClosed()) {
-			throw new IllegalStateException("BLOB Client is not connected. " +
-					"Client has been shut down or encountered an error before.");
-		}
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format("GET BLOB %s / \"%s\" from %s", jobID, key, socket.getLocalSocketAddress()));
-		}
-
-		try {
-			OutputStream os = this.socket.getOutputStream();
-			InputStream is = this.socket.getInputStream();
-
-			sendGetHeader(os, jobID, key, null);
-			receiveAndCheckResponse(is);
-
-			return new BlobInputStream(is, null);
-		}
-		catch (Throwable t) {
-			BlobUtils.closeSilently(socket, LOG);
-			throw new IOException("GET operation failed: " + t.getMessage(), t);
-		}
-	}
-
-	/**
 	 * Downloads the BLOB identified by the given BLOB key from the BLOB server. If no such BLOB exists on the server, a
 	 * {@link FileNotFoundException} is thrown.
 	 * 
@@ -202,7 +159,7 @@ public final class BlobClient implements Closeable {
 			InputStream is = this.socket.getInputStream();
 
 			// Send GET header
-			sendGetHeader(os, null, null, blobKey);
+			sendGetHeader(os, null, blobKey);
 			receiveAndCheckResponse(is);
 
 			return new BlobInputStream(is, blobKey);
@@ -221,33 +178,23 @@ public final class BlobClient implements Closeable {
 	 * @param jobID
 	 *        the job ID identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used
 	 *        to identify the BLOB on the server instead
-	 * @param key
-	 *        the key identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used to
-	 *        identify the BLOB on the server instead
 	 * @param blobKey
 	 *        the BLOB key to identify the BLOB to download if either the job ID or the regular key are
 	 *        <code>null</code>
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while writing the header data to the output stream
 	 */
-	private void sendGetHeader(OutputStream outputStream, JobID jobID, String key, BlobKey blobKey) throws IOException {
+	private void sendGetHeader(OutputStream outputStream, JobID jobID, BlobKey blobKey) throws IOException {
+		checkArgument(jobID == null);
 
 		// Signal type of operation
 		outputStream.write(GET_OPERATION);
 
 		// Check if GET should be done in content-addressable manner
-		if (jobID == null || key == null) {
+		if (jobID == null) {
 			outputStream.write(CONTENT_ADDRESSABLE);
 			blobKey.writeToOutputStream(outputStream);
 		}
-		else {
-			outputStream.write(NAME_ADDRESSABLE);
-			// Send job ID and key
-			outputStream.write(jobID.getBytes());
-			byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
-			writeLength(keyBytes.length, outputStream);
-			outputStream.write(keyBytes);
-		}
 	}
 
 	private void receiveAndCheckResponse(InputStream is) throws IOException {
@@ -296,68 +243,7 @@ public final class BlobClient implements Closeable {
 	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
 	public BlobKey put(byte[] value, int offset, int len) throws IOException {
-		return putBuffer(null, null, value, offset, len);
-	}
-
-	/**
-	 * Uploads the data of the given byte array to the BLOB server and stores it under the given job ID and key.
-	 *
-	 * @param jobId
-	 *        the job ID to identify the uploaded data
-	 * @param key
-	 *        the key to identify the uploaded data
-	 * @param value
-	 *        the buffer to upload
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
-	 */
-	public void put(JobID jobId, String key, byte[] value) throws IOException {
-		put(jobId, key, value, 0, value.length);
-	}
-
-	/**
-	 * Uploads data from the given byte array to the BLOB server and stores it under the given job ID and key.
-	 *
-	 * @param jobId
-	 *        the job ID to identify the uploaded data
-	 * @param key
-	 *        the key to identify the uploaded data
-	 * @param value
-	 *        the buffer to upload data from
-	 * @param offset
-	 *        the read offset within the buffer
-	 * @param len
-	 *        the number of bytes to upload from the buffer
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
-	 */
-	public void put(JobID jobId, String key, byte[] value, int offset, int len) throws IOException {
-		if (key.length() > MAX_KEY_LENGTH) {
-			throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
-		}
-
-		putBuffer(jobId, key, value, offset, len);
-	}
-
-	/**
-	 * Uploads data from the given input stream to the BLOB server and stores it under the given job ID and key.
-	 *
-	 * @param jobId
-	 *        the job ID to identify the uploaded data
-	 * @param key
-	 *        the key to identify the uploaded data
-	 * @param inputStream
-	 *        the input stream to read the data from
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading the data from the input stream or uploading the data to the
-	 *         BLOB server
-	 */
-	public void put(JobID jobId, String key, InputStream inputStream) throws IOException {
-		if (key.length() > MAX_KEY_LENGTH) {
-			throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
-		}
-
-		putInputStream(jobId, key, inputStream);
+		return putBuffer(null, value, offset, len);
 	}
 
 	/**
@@ -371,7 +257,7 @@ public final class BlobClient implements Closeable {
 	 *         BLOB server
 	 */
 	public BlobKey put(InputStream inputStream) throws IOException {
-		return putInputStream(null, null, inputStream);
+		return putInputStream(null, inputStream);
 	}
 
 	/**
@@ -380,9 +266,6 @@ public final class BlobClient implements Closeable {
 	 * @param jobId
 	 *        the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable
 	 *        manner
-	 * @param key
-	 *        the key to identify the BLOB on the server or <code>null</code> to store the BLOB in a content-addressable
-	 *        manner
 	 * @param value
 	 *        the buffer to read the data from
 	 * @param offset
@@ -394,7 +277,7 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	private BlobKey putBuffer(JobID jobId, String key, byte[] value, int offset, int len) throws IOException {
+	private BlobKey putBuffer(JobID jobId, byte[] value, int offset, int len) throws IOException {
 		if (this.socket.isClosed()) {
 			throw new IllegalStateException("BLOB Client is not connected. " +
 					"Client has been shut down or encountered an error before.");
@@ -404,9 +287,6 @@ public final class BlobClient implements Closeable {
 			if (jobId == null) {
 				LOG.debug(String.format("PUT content addressable BLOB buffer (%d bytes) to %s",
 						len, socket.getLocalSocketAddress()));
-			} else {
-				LOG.debug(String.format("PUT BLOB buffer (%d bytes) under %s / \"%s\" to %s",
-						len, jobId, key, socket.getLocalSocketAddress()));
 			}
 		}
 
@@ -415,7 +295,7 @@ public final class BlobClient implements Closeable {
 			final MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null;
 
 			// Send the PUT header
-			sendPutHeader(os, jobId, key);
+			sendPutHeader(os, jobId);
 
 			// Send the value in iterations of BUFFER_SIZE
 			int remainingBytes = len;
@@ -453,9 +333,6 @@ public final class BlobClient implements Closeable {
 	 * @param jobId
 	 *        the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable
 	 *        manner
-	 * @param key
-	 *        the key to identify the BLOB on the server or <code>null</code> to store the BLOB in a content-addressable
-	 *        manner
 	 * @param inputStream
 	 *        the input stream to read the data from
 	 * @return he computed BLOB key if the BLOB has been stored in a content-addressable manner, <code>null</code>
@@ -463,7 +340,7 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	private BlobKey putInputStream(JobID jobId, String key, InputStream inputStream) throws IOException {
+	private BlobKey putInputStream(JobID jobId, InputStream inputStream) throws IOException {
 		if (this.socket.isClosed()) {
 			throw new IllegalStateException("BLOB Client is not connected. " +
 					"Client has been shut down or encountered an error before.");
@@ -473,9 +350,6 @@ public final class BlobClient implements Closeable {
 			if (jobId == null) {
 				LOG.debug(String.format("PUT content addressable BLOB stream to %s",
 						socket.getLocalSocketAddress()));
-			} else {
-				LOG.debug(String.format("PUT BLOB stream under %s / \"%s\" to %s",
-						jobId, key, socket.getLocalSocketAddress()));
 			}
 		}
 
@@ -485,7 +359,7 @@ public final class BlobClient implements Closeable {
 			final byte[] xferBuf = new byte[BUFFER_SIZE];
 
 			// Send the PUT header
-			sendPutHeader(os, jobId, key);
+			sendPutHeader(os, jobId);
 
 			while (true) {
 				final int read = inputStream.read(xferBuf);
@@ -551,33 +425,17 @@ public final class BlobClient implements Closeable {
 	 * @param jobID
 	 *        the ID of job the BLOB belongs to or <code>null</code> to indicate the upload of a
 	 *        content-addressable BLOB
-	 * @param key
-	 *        the key of the BLOB to upload or <code>null</code> to indicate the upload of a content-addressable BLOB
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while writing the header data to the output stream
 	 */
-	private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) throws IOException {
-		// sanity check that either both are null or both are not null
-		if ((jobID != null || key != null) && !(jobID != null && key != null)) {
-			throw new IllegalArgumentException();
-		}
+	private void sendPutHeader(OutputStream outputStream, JobID jobID) throws IOException {
+		checkArgument(jobID == null);
 
 		// Signal type of operation
 		outputStream.write(PUT_OPERATION);
 
 		// Check if PUT should be done in content-addressable manner
-		if (jobID == null) {
-			outputStream.write(CONTENT_ADDRESSABLE);
-		}
-		else {
-			outputStream.write(NAME_ADDRESSABLE);
-			// Send job ID and the key
-			byte[] idBytes = jobID.getBytes();
-			byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
-			outputStream.write(idBytes);
-			writeLength(keyBytes.length, outputStream);
-			outputStream.write(keyBytes);
-		}
+		outputStream.write(CONTENT_ADDRESSABLE);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -587,67 +445,14 @@ public final class BlobClient implements Closeable {
 	/**
 	 * Deletes the BLOB identified by the given BLOB key from the BLOB server.
 	 *
-	 * @param key
+	 * @param blobKey
 	 *        the key to identify the BLOB
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while transferring the request to
 	 *         the BLOB server or if the BLOB server cannot delete the file
 	 */
-	public void delete(BlobKey key) throws IOException {
-		checkArgument(key != null, "BLOB key must not be null.");
-
-		deleteInternal(null, null, key);
-	}
-
-	/**
-	 * Deletes the BLOB identified by the given job ID and key from the BLOB server.
-	 *
-	 * @param jobId
-	 *        the job ID to identify the BLOB
-	 * @param key
-	 *        the key to identify the BLOB
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while transferring the request to the BLOB server
-	 */
-	public void delete(JobID jobId, String key) throws IOException {
-		checkArgument(jobId != null, "Job id must not be null.");
-		checkArgument(key != null, "BLOB name must not be null.");
-		
-		if (key.length() > MAX_KEY_LENGTH) {
-			throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
-		}
-
-		deleteInternal(jobId, key, null);
-	}
-
-	/**
-	 * Deletes all BLOBs belonging to the job with the given ID from the BLOB server
-	 *
-	 * @param jobId
-	 *        the job ID to identify the BLOBs to be deleted
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while transferring the request to the BLOB server
-	 */
-	public void deleteAll(JobID jobId) throws IOException {
-		checkArgument(jobId != null, "Job id must not be null.");
-
-		deleteInternal(jobId, null, null);
-	}
-
-	/**
-	 * Delete one or multiple BLOBs from the BLOB server.
-	 *
-	 * @param jobId The job ID to identify the BLOB(s) to be deleted.
-	 * @param key The key to identify the specific BLOB to delete or <code>null</code> to delete
-	 *            all BLOBs associated with the job id.
-	 * @param bKey The blob key to identify a specific content addressable BLOB. This parameter
-	 *             is exclusive with jobId and key.
-	 * @throws IOException Thrown if an I/O error occurs while transferring the request to the BLOB server.
-	 */
-	private void deleteInternal(JobID jobId, String key, BlobKey bKey) throws IOException {
-		if ((jobId != null && bKey != null) || (jobId == null && bKey == null)) {
-			throw new IllegalArgumentException();
-		}
+	public void delete(BlobKey blobKey) throws IOException {
+		checkArgument(blobKey != null, "BLOB key must not be null.");
 
 		try {
 			final OutputStream outputStream = this.socket.getOutputStream();
@@ -656,28 +461,9 @@ public final class BlobClient implements Closeable {
 			// Signal type of operation
 			outputStream.write(DELETE_OPERATION);
 
-			// Check if DELETE should be done in content-addressable manner
-			if (jobId == null) {
-				// delete blob key
-				outputStream.write(CONTENT_ADDRESSABLE);
-				bKey.writeToOutputStream(outputStream);
-			}
-			else if (key != null) {
-				// delete BLOB for jobID and name key
-				outputStream.write(NAME_ADDRESSABLE);
-				// Send job ID and the key
-				byte[] idBytes = jobId.getBytes();
-				byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
-				outputStream.write(idBytes);
-				writeLength(keyBytes.length, outputStream);
-				outputStream.write(keyBytes);
-			}
-			else {
-				// delete all blobs for JobID
-				outputStream.write(JOB_ID_SCOPE);
-				byte[] idBytes = jobId.getBytes();
-				outputStream.write(idBytes);
-			}
+			// delete blob key
+			outputStream.write(CONTENT_ADDRESSABLE);
+			blobKey.writeToOutputStream(outputStream);
 
 			int response = inputStream.read();
 			if (response < 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 3b3a39b..ecb4527 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -26,7 +25,6 @@ import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -200,31 +198,6 @@ public class BlobServer extends Thread implements BlobService {
 	}
 
 	/**
-	 * Returns a file handle to the file identified by the given jobID and key.
-	 *
-	 * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
-	 *
-	 * @param jobID to which the file is associated
-	 * @param key to identify the file within the job context
-	 * @return file handle to the file
-	 */
-	File getStorageLocation(JobID jobID, String key) {
-		return BlobUtils.getStorageLocation(storageDir, jobID, key);
-	}
-
-	/**
-	 * Method which deletes all files associated with the given jobID.
-	 *
-	 * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
-	 *
-	 * @param jobID all files associated to this jobID will be deleted
-	 * @throws IOException
-	 */
-	void deleteJobDirectory(JobID jobID) throws IOException {
-		BlobUtils.deleteJobDirectory(storageDir, jobID);
-	}
-
-	/**
 	 * Returns a temporary file inside the BLOB server's incoming directory.
 	 *
 	 * @return a temporary file inside the BLOB server's incoming directory

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index ad33c5d..181211d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -42,9 +42,6 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
@@ -112,7 +109,6 @@ class BlobServerConnection extends Thread {
 		try {
 			final InputStream inputStream = this.clientSocket.getInputStream();
 			final OutputStream outputStream = this.clientSocket.getOutputStream();
-			final byte[] buffer = new byte[BUFFER_SIZE];
 
 			while (true) {
 				// Read the requested operation
@@ -124,13 +120,13 @@ class BlobServerConnection extends Thread {
 
 				switch (operation) {
 				case PUT_OPERATION:
-					put(inputStream, outputStream, buffer);
+					put(inputStream, outputStream, new byte[BUFFER_SIZE]);
 					break;
 				case GET_OPERATION:
-					get(inputStream, outputStream, buffer);
+					get(inputStream, outputStream, new byte[BUFFER_SIZE]);
 					break;
 				case DELETE_OPERATION:
-					delete(inputStream, outputStream, buffer);
+					delete(inputStream, outputStream);
 					break;
 				default:
 					throw new IOException("Unknown operation " + operation);
@@ -182,7 +178,7 @@ class BlobServerConnection extends Thread {
 	 *         thrown if an I/O error occurs while reading/writing data from/to the respective streams
 	 */
 	private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
-		/**
+		/*
 		 * Retrieve the file from the (distributed?) BLOB store and store it
 		 * locally, then send it to the service which requested it.
 		 *
@@ -194,7 +190,6 @@ class BlobServerConnection extends Thread {
 		File blobFile;
 		int contentAddressable = -1;
 		JobID jobId = null;
-		String key = null;
 		BlobKey blobKey = null;
 
 		try {
@@ -203,16 +198,7 @@ class BlobServerConnection extends Thread {
 			if (contentAddressable < 0) {
 				throw new EOFException("Premature end of GET request");
 			}
-			if (contentAddressable == NAME_ADDRESSABLE) {
-				// Receive the job ID and key
-				byte[] jidBytes = new byte[JobID.SIZE];
-				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
-
-				jobId = JobID.fromByteArray(jidBytes);
-				key = readKey(buf, inputStream);
-				blobFile = blobServer.getStorageLocation(jobId, key);
-			}
-			else if (contentAddressable == CONTENT_ADDRESSABLE) {
+			if (contentAddressable == CONTENT_ADDRESSABLE) {
 				blobKey = BlobKey.readFromInputStream(inputStream);
 				blobFile = blobServer.getStorageLocation(blobKey);
 			}
@@ -248,13 +234,7 @@ class BlobServerConnection extends Thread {
 						if (blobFile.exists()) {
 							LOG.debug("Blob file {} has downloaded from the BlobStore by a different connection.", blobFile);
 						} else {
-							if (contentAddressable == NAME_ADDRESSABLE) {
-								blobStore.get(jobId, key, blobFile);
-							} else if (contentAddressable == CONTENT_ADDRESSABLE) {
-								blobStore.get(blobKey, blobFile);
-							} else {
-								throw new IOException("Unknown type of BLOB addressing: " + contentAddressable + '.');
-							}
+							blobStore.get(blobKey, blobFile);
 						}
 					} finally {
 						writeLock.unlock();
@@ -321,7 +301,6 @@ class BlobServerConnection extends Thread {
 	 */
 	private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
 		JobID jobID = null;
-		String key = null;
 		MessageDigest md = null;
 
 		File incomingFile = null;
@@ -333,14 +312,7 @@ class BlobServerConnection extends Thread {
 				throw new EOFException("Premature end of PUT request");
 			}
 
-			if (contentAddressable == NAME_ADDRESSABLE) {
-				// Receive the job ID and key
-				byte[] jidBytes = new byte[JobID.SIZE];
-				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
-				jobID = JobID.fromByteArray(jidBytes);
-				key = readKey(buf, inputStream);
-			}
-			else if (contentAddressable == CONTENT_ADDRESSABLE) {
+			if (contentAddressable == CONTENT_ADDRESSABLE) {
 				md = BlobUtils.createMessageDigest();
 			}
 			else {
@@ -348,11 +320,7 @@ class BlobServerConnection extends Thread {
 			}
 
 			if (LOG.isDebugEnabled()) {
-				if (contentAddressable == NAME_ADDRESSABLE) {
-					LOG.debug(String.format("Received PUT request for BLOB under %s / \"%s\"", jobID, key));
-				} else {
-					LOG.debug("Received PUT request for content addressable BLOB");
-				}
+				LOG.debug("Received PUT request for content addressable BLOB");
 			}
 
 			incomingFile = blobServer.createTemporaryFilename();
@@ -377,89 +345,47 @@ class BlobServerConnection extends Thread {
 			}
 			fos.close();
 
-			if (contentAddressable == NAME_ADDRESSABLE) {
-				File storageFile = blobServer.getStorageLocation(jobID, key);
+			BlobKey blobKey = new BlobKey(md.digest());
+			File storageFile = blobServer.getStorageLocation(blobKey);
 
-				writeLock.lock();
+			writeLock.lock();
 
-				try {
-					// first check whether the file already exists
-					if (!storageFile.exists()) {
-						try {
-							// only move the file if it does not yet exist
-							Files.move(incomingFile.toPath(), storageFile.toPath());
-
-							incomingFile = null;
-
-						} catch (FileAlreadyExistsException ignored) {
-							LOG.warn("Detected concurrent file modifications. This should only happen if multiple" +
-								"BlobServer use the same storage directory.");
-							// we cannot be sure at this point whether the file has already been uploaded to the blob
-							// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
-							// persist the blob. Otherwise we might not be able to recover the job.
-						}
-
-						// only the one moving the incoming file to its final destination is allowed to upload the
-						// file to the blob store
-						blobStore.put(storageFile, jobID, key);
-					}
-				} catch(IOException ioe) {
-					// we failed to either create the local storage file or to upload it --> try to delete the local file
-					// while still having the write lock
-					if (storageFile.exists() && !storageFile.delete()) {
-						LOG.warn("Could not delete the storage file.");
-					}
-
-					throw ioe;
-				} finally {
-					writeLock.unlock();
-				}
-
-				outputStream.write(RETURN_OKAY);
-			}
-			else {
-				BlobKey blobKey = new BlobKey(md.digest());
-				File storageFile = blobServer.getStorageLocation(blobKey);
-
-				writeLock.lock();
+			try {
+				// first check whether the file already exists
+				if (!storageFile.exists()) {
+					try {
+						// only move the file if it does not yet exist
+						Files.move(incomingFile.toPath(), storageFile.toPath());
 
-				try {
-					// first check whether the file already exists
-					if (!storageFile.exists()) {
-						try {
-							// only move the file if it does not yet exist
-							Files.move(incomingFile.toPath(), storageFile.toPath());
-
-							incomingFile = null;
-
-						} catch (FileAlreadyExistsException ignored) {
-							LOG.warn("Detected concurrent file modifications. This should only happen if multiple" +
-								"BlobServer use the same storage directory.");
-							// we cannot be sure at this point whether the file has already been uploaded to the blob
-							// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
-							// persist the blob. Otherwise we might not be able to recover the job.
-						}
+						incomingFile = null;
 
-						// only the one moving the incoming file to its final destination is allowed to upload the
-						// file to the blob store
-						blobStore.put(storageFile, blobKey);
-					}
-				} catch(IOException ioe) {
-					// we failed to either create the local storage file or to upload it --> try to delete the local file
-					// while still having the write lock
-					if (storageFile.exists() && !storageFile.delete()) {
-						LOG.warn("Could not delete the storage file.");
+					} catch (FileAlreadyExistsException ignored) {
+						LOG.warn("Detected concurrent file modifications. This should only happen if multiple" +
+							"BlobServer use the same storage directory.");
+						// we cannot be sure at this point whether the file has already been uploaded to the blob
+						// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
+						// persist the blob. Otherwise we might not be able to recover the job.
 					}
 
-					throw ioe;
-				} finally {
-					writeLock.unlock();
+					// only the one moving the incoming file to its final destination is allowed to upload the
+					// file to the blob store
+					blobStore.put(storageFile, blobKey);
+				}
+			} catch(IOException ioe) {
+				// we failed to either create the local storage file or to upload it --> try to delete the local file
+				// while still having the write lock
+				if (storageFile.exists() && !storageFile.delete()) {
+					LOG.warn("Could not delete the storage file.");
 				}
 
-				// Return computed key to client for validation
-				outputStream.write(RETURN_OKAY);
-				blobKey.writeToOutputStream(outputStream);
+				throw ioe;
+			} finally {
+				writeLock.unlock();
 			}
+
+			// Return computed key to client for validation
+			outputStream.write(RETURN_OKAY);
+			blobKey.writeToOutputStream(outputStream);
 		}
 		catch (SocketException e) {
 			// happens when the other side disconnects
@@ -499,7 +425,7 @@ class BlobServerConnection extends Thread {
 	 * @param outputStream The output stream to write the response to.
 	 * @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream.
 	 */
-	private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
+	private void delete(InputStream inputStream, OutputStream outputStream) throws IOException {
 
 		try {
 			int type = inputStream.read();
@@ -511,46 +437,6 @@ class BlobServerConnection extends Thread {
 				BlobKey key = BlobKey.readFromInputStream(inputStream);
 				blobServer.delete(key);
 			}
-			else if (type == NAME_ADDRESSABLE) {
-				byte[] jidBytes = new byte[JobID.SIZE];
-				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
-				JobID jobID = JobID.fromByteArray(jidBytes);
-
-				String key = readKey(buf, inputStream);
-
-				File blobFile = this.blobServer.getStorageLocation(jobID, key);
-
-				writeLock.lock();
-
-				try {
-					// we should make the local and remote file deletion atomic, otherwise we might risk not
-					// removing the remote file in case of a concurrent put operation
-					if (blobFile.exists() && !blobFile.delete()) {
-						LOG.warn("Cannot delete local BLOB file " + blobFile.getAbsolutePath());
-					}
-
-					blobStore.delete(jobID, key);
-				} finally {
-					writeLock.unlock();
-				}
-			}
-			else if (type == JOB_ID_SCOPE) {
-				byte[] jidBytes = new byte[JobID.SIZE];
-				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
-				JobID jobID = JobID.fromByteArray(jidBytes);
-
-				writeLock.lock();
-
-				try {
-					// we should make the local and remote file deletion atomic, otherwise we might risk not
-					// removing the remote file in case of a concurrent put operation
-					blobServer.deleteJobDirectory(jobID);
-
-					blobStore.deleteAll(jobID);
-				} finally {
-					writeLock.unlock();
-				}
-			}
 			else {
 				throw new IOException("Unrecognized addressing type: " + type);
 			}
@@ -575,27 +461,6 @@ class BlobServerConnection extends Thread {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Reads the key of a BLOB from the given input stream.
-	 * 
-	 * @param buf
-	 *        auxiliary buffer to data deserialization
-	 * @param inputStream
-	 *        the input stream to read the key from
-	 * @return the key of a BLOB
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading the key data from the input stream
-	 */
-	private static String readKey(byte[] buf, InputStream inputStream) throws IOException {
-		final int keyLength = readLength(inputStream);
-		if (keyLength > MAX_KEY_LENGTH) {
-			throw new IOException("Unexpected key length " + keyLength);
-		}
-
-		readFully(inputStream, buf, 0, keyLength, "BlobKey");
-		return new String(buf, 0, keyLength, BlobUtils.DEFAULT_CHARSET);
-	}
-
-	/**
 	 * Writes to the output stream the error return code, and the given exception in serialized form.
 	 *
 	 * @param out Thr output stream to write to.

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
index 6df7811..d8ac833 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
@@ -26,9 +26,6 @@ public class BlobServerProtocol {
 	/** The buffer size in bytes for network transfers. */
 	static final int BUFFER_SIZE = 65536; // 64 K
 
-	/** The maximum key length allowed for storing BLOBs. */
-	static final int MAX_KEY_LENGTH = 64;
-
 	/** Internal code to identify a PUT operation. */
 	static final byte PUT_OPERATION = 0;
 
@@ -44,15 +41,14 @@ public class BlobServerProtocol {
 	/** Internal code to identify an erroneous operation. */
 	static final byte RETURN_ERROR = 1;
 
-	/** Internal code to identify a reference via content hash as the key */
+	/**
+	 * Internal code to identify a reference via content hash as the key.
+	 * <p>
+	 * Note: previously, there was also <tt>NAME_ADDRESSABLE</tt> (code <tt>1</tt>) and
+	 * <tt>JOB_ID_SCOPE</tt> (code <tt>2</tt>).
+	 */
 	static final byte CONTENT_ADDRESSABLE = 0;
 
-	/** Internal code to identify a reference via jobId and name as the key */
-	static final byte NAME_ADDRESSABLE = 1;
-
-	/** Internal code to identify a reference via jobId as the key */
-	static final byte JOB_ID_SCOPE = 2;
-
 	// --------------------------------------------------------------------------------------------
 
 	private BlobServerProtocol() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 4c26a5a..1e8b73a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -38,18 +38,6 @@ public interface BlobStore extends BlobView {
 	void put(File localFile, BlobKey blobKey) throws IOException;
 
 	/**
-	 * Copies a local file to the blob store.
-	 *
-	 * <p>The job ID and key make up a composite key for the file.
-	 *
-	 * @param localFile The file to copy
-	 * @param jobId     The JobID part of ID for the file in the blob store
-	 * @param key       The String part of ID for the file in the blob store
-	 * @throws IOException If the copy fails
-	 */
-	void put(File localFile, JobID jobId, String key) throws IOException;
-
-	/**
 	 * Tries to delete a blob from storage.
 	 *
 	 * <p>NOTE: This also tries to delete any created directories if empty.</p>
@@ -59,16 +47,6 @@ public interface BlobStore extends BlobView {
 	void delete(BlobKey blobKey);
 
 	/**
-	 * Tries to delete a blob from storage.
-	 *
-	 * <p>NOTE: This also tries to delete any created directories if empty.</p>
-	 *
-	 * @param jobId The JobID part of ID for the blob
-	 * @param key   The String part of ID for the blob
-	 */
-	void delete(JobID jobId, String key);
-
-	/**
 	 * Tries to delete all blobs for the given job from storage.
 	 *
 	 * <p>NOTE: This also tries to delete any created directories if empty.</p>

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 9d538cc..e8f3fe5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -18,16 +18,13 @@
 
 package org.apache.flink.runtime.blob;
 
-import com.google.common.io.BaseEncoding;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 
@@ -37,7 +34,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
-import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.UUID;
@@ -58,17 +54,12 @@ public class BlobUtils {
 	/**
 	 * The prefix of all BLOB files stored by the BLOB server.
 	 */
-	static final String BLOB_FILE_PREFIX = "blob_";
+	private static final String BLOB_FILE_PREFIX = "blob_";
 
 	/**
 	 * The prefix of all job-specific directories created by the BLOB server.
 	 */
-	static final String JOB_DIR_PREFIX = "job_";
-
-	/**
-	 * The default character set to translate between characters and bytes.
-	 */
-	static final Charset DEFAULT_CHARSET = ConfigConstants.DEFAULT_CHARSET;
+	private static final String JOB_DIR_PREFIX = "job_";
 
 	/**
 	 * Creates a BlobStore based on the parameters set in the configuration.
@@ -205,60 +196,6 @@ public class BlobUtils {
 	}
 
 	/**
-	 * Returns the (designated) physical storage location of the BLOB with the given job ID and key.
-	 *
-	 * @param jobID
-	 *        the ID of the job the BLOB belongs to
-	 * @param key
-	 *        the key of the BLOB
-	 * @return the (designated) physical storage location of the BLOB with the given job ID and key
-	 */
-	static File getStorageLocation(File storageDir, JobID jobID, String key) {
-		return new File(getJobDirectory(storageDir, jobID), BLOB_FILE_PREFIX + encodeKey(key));
-	}
-
-	/**
-	 * Returns the BLOB server's storage directory for BLOBs belonging to the job with the given ID.
-	 *
-	 * @param jobID
-	 *        the ID of the job to return the storage directory for
-	 * @return the storage directory for BLOBs belonging to the job with the given ID
-	 */
-	private static File getJobDirectory(File storageDir, JobID jobID) {
-		final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString());
-
-		// note: thread-safe create should try to mkdir first and then ignore the case that the
-		//       directory already existed
-		if (!jobDirectory.mkdirs() && !jobDirectory.exists()) {
-			throw new RuntimeException("Could not create jobId directory '" + jobDirectory.getAbsolutePath() + "'.");
-		}
-
-		return jobDirectory;
-	}
-
-	/**
-	 * Translates the user's key for a BLOB into the internal name used by the BLOB server
-	 *
-	 * @param key
-	 *        the user's key for a BLOB
-	 * @return the internal name for the BLOB as used by the BLOB server
-	 */
-	static String encodeKey(String key) {
-		return BaseEncoding.base64().encode(key.getBytes(DEFAULT_CHARSET));
-	}
-
-	/**
-	 * Deletes the storage directory for the job with the given ID.
-	 *
-	 * @param jobID
-	 *			jobID whose directory shall be deleted
-	 */
-	static void deleteJobDirectory(File storageDir, JobID jobID) throws IOException {
-		File directory = getJobDirectory(storageDir, jobID);
-		FileUtils.deleteDirectory(directory);
-	}
-
-	/**
 	 * Creates a new instance of the message digest to use for the BLOB key computation.
 	 *
 	 * @return a new instance of the message digest to use for the BLOB key computation
@@ -409,19 +346,6 @@ public class BlobUtils {
 	}
 
 	/**
-	 * Returns the path for the given job ID and key.
-	 *
-	 * <p>The returned path can be used with the state backend for recovery purposes.
-	 *
-	 * <p>This follows the same scheme as {@link #getStorageLocation(File, JobID, String)}.
-	 */
-	static String getRecoveryPath(String basePath, JobID jobId, String key) {
-		// format: $base/job_$id/blob_$key
-		return String.format("%s/%s%s/%s%s", basePath, JOB_DIR_PREFIX, jobId.toString(),
-				BLOB_FILE_PREFIX, encodeKey(key));
-	}
-
-	/**
 	 * Returns the path for the given job ID.
 	 *
 	 * <p>The returned path can be used with the state backend for recovery purposes.

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
index 11cf011..d174cf0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
@@ -36,14 +36,4 @@ public interface BlobView {
 	 * @throws IOException If the copy fails
 	 */
 	void get(BlobKey blobKey, File localFile) throws IOException;
-
-	/**
-	 * Copies a blob to a local file.
-	 *
-	 * @param jobId     The JobID part of ID for the blob
-	 * @param key       The String part of ID for the blob
-	 * @param localFile The local file to copy to
-	 * @throws IOException If the copy fails
-	 */
-	void get(JobID jobId, String key, File localFile) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index f3607d8..5f8058b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -68,11 +68,6 @@ public class FileSystemBlobStore implements BlobStoreService {
 		put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey));
 	}
 
-	@Override
-	public void put(File localFile, JobID jobId, String key) throws IOException {
-		put(localFile, BlobUtils.getRecoveryPath(basePath, jobId, key));
-	}
-
 	private void put(File fromFile, String toBlobPath) throws IOException {
 		try (OutputStream os = fileSystem.create(new Path(toBlobPath), FileSystem.WriteMode.OVERWRITE)) {
 			LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
@@ -87,11 +82,6 @@ public class FileSystemBlobStore implements BlobStoreService {
 		get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile);
 	}
 
-	@Override
-	public void get(JobID jobId, String key, File localFile) throws IOException {
-		get(BlobUtils.getRecoveryPath(basePath, jobId, key), localFile);
-	}
-
 	private void get(String fromBlobPath, File toFile) throws IOException {
 		checkNotNull(fromBlobPath, "Blob path");
 		checkNotNull(toFile, "File");
@@ -127,11 +117,6 @@ public class FileSystemBlobStore implements BlobStoreService {
 	}
 
 	@Override
-	public void delete(JobID jobId, String key) {
-		delete(BlobUtils.getRecoveryPath(basePath, jobId, key));
-	}
-
-	@Override
 	public void deleteAll(JobID jobId) {
 		delete(BlobUtils.getRecoveryPath(basePath, jobId));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index c14d082..6e2bb53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -32,27 +32,16 @@ public class VoidBlobStore implements BlobStoreService {
 	public void put(File localFile, BlobKey blobKey) throws IOException {
 	}
 
-	@Override
-	public void put(File localFile, JobID jobId, String key) throws IOException {
-	}
 
 	@Override
 	public void get(BlobKey blobKey, File localFile) throws IOException {
 	}
 
 	@Override
-	public void get(JobID jobId, String key, File localFile) throws IOException {
-	}
-
-	@Override
 	public void delete(BlobKey blobKey) {
 	}
 
 	@Override
-	public void delete(JobID jobId, String key) {
-	}
-
-	@Override
 	public void deleteAll(JobID jobId) {
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 340ac42..a5189ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -237,55 +237,6 @@ public class BlobClientSslTest extends TestLogger {
 	}
 
 	/**
-	 * Tests the PUT/GET operations for regular (non-content-addressable) streams.
-	 */
-	@Test
-	public void testRegularStream() {
-
-		final JobID jobID = JobID.generate();
-		final String key = "testkey3";
-
-		try {
-			final File testFile = File.createTempFile("testfile", ".dat");
-			testFile.deleteOnExit();
-			prepareTestFile(testFile);
-
-			BlobClient client = null;
-			InputStream is = null;
-			try {
-
-				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort());
-				client = new BlobClient(serverAddress, sslClientConfig);
-
-				// Store the data
-				is = new FileInputStream(testFile);
-				client.put(jobID, key, is);
-
-				is.close();
-				is = null;
-
-				// Retrieve the data
-				is = client.get(jobID, key);
-				validateGet(is, testFile);
-
-			}
-			finally {
-				if (is != null) {
-					is.close();
-				}
-				if (client != null) {
-					client.close();
-				}
-			}
-
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
 	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper.
 	 */
 	private void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index fda4ee9..0a8f738 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -286,102 +286,6 @@ public class BlobClientTest {
 	}
 
 	/**
-	 * Tests the PUT/GET operations for regular (non-content-addressable) buffers.
-	 */
-	@Test
-	public void testRegularBuffer() {
-
-		final byte[] testBuffer = createTestBuffer();
-		final JobID jobID = JobID.generate();
-		final String key = "testkey";
-
-		try {
-			BlobClient client = null;
-			try {
-				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort());
-				client = new BlobClient(serverAddress, blobServiceConfig);
-
-				// Store the data
-				client.put(jobID, key, testBuffer);
-
-				// Retrieve the data
-				final InputStream is = client.get(jobID, key);
-				validateGet(is, testBuffer);
-
-				// Delete the data
-				client.delete(jobID, key);
-				
-				// Check if the BLOB is still available
-				try {
-					client.get(jobID, key);
-					fail("Expected IOException did not occur");
-				}
-				catch (IOException e) {
-					// expected
-				}
-			}
-			finally {
-				if (client != null) {
-					client.close();
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests the PUT/GET operations for regular (non-content-addressable) streams.
-	 */
-	@Test
-	public void testRegularStream() {
-
-		final JobID jobID = JobID.generate();
-		final String key = "testkey3";
-
-		try {
-			final File testFile = File.createTempFile("testfile", ".dat");
-			testFile.deleteOnExit();
-			prepareTestFile(testFile);
-
-			BlobClient client = null;
-			InputStream is = null;
-			try {
-
-				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort());
-				client = new BlobClient(serverAddress, blobServiceConfig);
-
-				// Store the data
-				is = new FileInputStream(testFile);
-				client.put(jobID, key, is);
-
-				is.close();
-				is = null;
-
-				// Retrieve the data
-				is = client.get(jobID, key);
-				validateGet(is, testFile);
-
-			}
-			finally {
-				if (is != null) {
-					is.close();
-				}
-				if (client != null) {
-					client.close();
-				}
-			}
-
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
 	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper.
 	 */
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 4f12ddb..c2a3a7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -97,10 +97,6 @@ public class BlobRecoveryITCase extends TestLogger {
 			keys[1] = client.put(expected, 32, 256); // Request 2
 
 			JobID[] jobId = new JobID[] { new JobID(), new JobID() };
-			String[] testKey = new String[] { "test-key-1", "test-key-2" };
-
-			client.put(jobId[0], testKey[0], expected); // Request 3
-			client.put(jobId[1], testKey[1], expected, 32, 256); // Request 4
 
 			// check that the storage directory exists
 			final Path blobServerPath = new Path(storagePath, "blob");
@@ -132,31 +128,9 @@ public class BlobRecoveryITCase extends TestLogger {
 				}
 			}
 
-			// Verify request 3
-			try (InputStream is = client.get(jobId[0], testKey[0])) {
-				byte[] actual = new byte[expected.length];
-				BlobUtils.readFully(is, actual, 0, expected.length, null);
-
-				for (int i = 0; i < expected.length; i++) {
-					assertEquals(expected[i], actual[i]);
-				}
-			}
-
-			// Verify request 4
-			try (InputStream is = client.get(jobId[1], testKey[1])) {
-				byte[] actual = new byte[256];
-				BlobUtils.readFully(is, actual, 0, 256, null);
-
-				for (int i = 32, j = 0; i < 256; i++, j++) {
-					assertEquals(expected[i], actual[j]);
-				}
-			}
-
 			// Remove again
 			client.delete(keys[0]);
 			client.delete(keys[1]);
-			client.delete(jobId[0], testKey[0]);
-			client.delete(jobId[1], testKey[1]);
 
 			// Verify everything is clean
 			assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath)));

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 15e2c7a..7100e79 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
@@ -120,129 +119,6 @@ public class BlobServerDeleteTest extends TestLogger {
 	}
 
 	@Test
-	public void testDeleteSingleByName() {
-		BlobServer server = null;
-		BlobClient client = null;
-		BlobStore blobStore = new VoidBlobStore();
-
-		try {
-			Configuration config = new Configuration();
-			server = new BlobServer(config, blobStore);
-
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			client = new BlobClient(serverAddress, config);
-
-			byte[] data = new byte[2000000];
-			rnd.nextBytes(data);
-
-			JobID jobID = new JobID();
-			String name1 = "random name";
-			String name2 = "any nyme";
-
-			client.put(jobID, name1, data);
-			client.put(jobID, name2, data);
-
-			// issue a DELETE request via the client
-			client.delete(jobID, name1);
-			client.close();
-
-			client = new BlobClient(serverAddress, config);
-			try {
-				client.get(jobID, name1);
-				fail("BLOB should have been deleted");
-			}
-			catch (IOException e) {
-				// expected
-			}
-
-			try {
-				client.put(new byte[1]);
-				fail("client should be closed after erroneous operation");
-			}
-			catch (IllegalStateException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			cleanup(server, client);
-		}
-	}
-
-	@Test
-	public void testDeleteAll() {
-		BlobServer server = null;
-		BlobClient client = null;
-		BlobStore blobStore = new VoidBlobStore();
-
-		try {
-			Configuration config = new Configuration();
-			server = new BlobServer(config, blobStore);
-
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			client = new BlobClient(serverAddress, config);
-
-			byte[] data = new byte[2000000];
-			rnd.nextBytes(data);
-
-			JobID jobID = new JobID();
-			String name1 = "random name";
-			String name2 = "any nyme";
-
-			// put content addressable (like libraries)
-			client.put(jobID, name1, data);
-			client.put(jobID, name2, new byte[712]);
-			// items for a second (different!) job ID
-			final byte[] jobIdBytes = jobID.getBytes();
-			jobIdBytes[0] ^= 1;
-			JobID jobID2 = JobID.fromByteArray(jobIdBytes);
-			client.put(jobID2, name1, data);
-			client.put(jobID2, name2, new byte[712]);
-
-
-			// issue a DELETE ALL request via the client
-			client.deleteAll(jobID);
-			client.close();
-
-			client = new BlobClient(serverAddress, config);
-			try {
-				client.get(jobID, name1);
-				fail("BLOB should have been deleted");
-			}
-			catch (IOException e) {
-				// expected
-			}
-
-			try {
-				client.put(new byte[1]);
-				fail("client should be closed after erroneous operation");
-			}
-			catch (IllegalStateException e) {
-				// expected
-			}
-
-			client = new BlobClient(serverAddress, config);
-			try {
-				client.get(jobID, name2);
-				fail("BLOB should have been deleted");
-			}
-			catch (IOException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			cleanup(server, client);
-		}
-	}
-
-	@Test
 	public void testDeleteAlreadyDeletedByBlobKey() {
 		BlobServer server = null;
 		BlobClient client = null;
@@ -286,48 +162,7 @@ public class BlobServerDeleteTest extends TestLogger {
 	}
 
 	@Test
-	public void testDeleteAlreadyDeletedByName() {
-		BlobServer server = null;
-		BlobClient client = null;
-		BlobStore blobStore = new VoidBlobStore();
-
-		try {
-			Configuration config = new Configuration();
-			server = new BlobServer(config, blobStore);
-
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			client = new BlobClient(serverAddress, config);
-
-			byte[] data = new byte[2000000];
-			rnd.nextBytes(data);
-
-			JobID jid = new JobID();
-			String name = "------------fdghljEgRJHF+##4U789Q345";
-
-			client.put(jid, name, data);
-
-			File blobFile = server.getStorageLocation(jid, name);
-			assertTrue(blobFile.delete());
-
-			// issue a DELETE request via the client
-			try {
-				client.delete(jid, name);
-			}
-			catch (IOException e) {
-				fail("DELETE operation should not fail if file is already deleted");
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			cleanup(server, client);
-		}
-	}
-
-	@Test
-	public void testDeleteFailsByBlobKey() {
+	public void testDeleteByBlobKeyFails() {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -376,52 +211,6 @@ public class BlobServerDeleteTest extends TestLogger {
 		}
 	}
 
-	@Test
-	public void testDeleteByNameFails() {
-		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
-
-		BlobServer server = null;
-		BlobClient client = null;
-		BlobStore blobStore = new VoidBlobStore();
-
-		File blobFile = null;
-		File directory = null;
-		try {
-			Configuration config = new Configuration();
-			server = new BlobServer(config, blobStore);
-
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			client = new BlobClient(serverAddress, config);
-
-			byte[] data = new byte[2000000];
-			rnd.nextBytes(data);
-
-			JobID jid = new JobID();
-			String name = "------------fdghljEgRJHF+##4U789Q345";
-
-			client.put(jid, name, data);
-
-			blobFile = server.getStorageLocation(jid, name);
-			directory = blobFile.getParentFile();
-
-			assertTrue(blobFile.setWritable(false, false));
-			assertTrue(directory.setWritable(false, false));
-
-			// issue a DELETE request via the client
-			client.delete(jid, name);
-
-			// the file should still be there
-			client.get(jid, name);
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		} finally {
-			blobFile.setWritable(true, false);
-			directory.setWritable(true, false);
-			cleanup(server, client);
-		}
-	}
-
 	/**
 	 * FLINK-6020
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 31e63a0..8b8ddf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -44,7 +43,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -121,47 +124,6 @@ public class BlobServerPutTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Checked thread that calls {@link BlobServer#getStorageLocation(JobID, String)}
-	 */
-	public static class NameAddressableGetStorageLocation extends CheckedThread {
-		private final BlobServer server;
-		private final JobID jid;
-		private final String name;
-
-		public NameAddressableGetStorageLocation(BlobServer server, JobID jid, String name) {
-			this.server = server;
-			this.jid = jid;
-			this.name = name;
-		}
-
-		@Override
-		public void go() throws Exception {
-			server.getStorageLocation(jid, name);
-		}
-	}
-
-	/**
-	 * Tests concurrent calls to {@link BlobServer#getStorageLocation(JobID, String)}.
-	 */
-	@Test
-	public void testServerNameAddressableGetStorageLocationConcurrent() throws Exception {
-		BlobServer server = new BlobServer(new Configuration(), new VoidBlobStore());
-
-		try {
-			JobID jid = new JobID();
-			String stringKey = "my test key";
-			CheckedThread[] threads = new CheckedThread[] {
-				new NameAddressableGetStorageLocation(server, jid, stringKey),
-				new NameAddressableGetStorageLocation(server, jid, stringKey),
-				new NameAddressableGetStorageLocation(server, jid, stringKey)
-			};
-			checkedThreadSimpleTest(threads);
-		} finally {
-			server.close();
-		}
-	}
-
 	// --------------------------------------------------------------------------------------------
 
 	@Test
@@ -186,11 +148,6 @@ public class BlobServerPutTest extends TestLogger {
 			BlobKey key2 = client.put(data, 10, 44);
 			assertNotNull(key2);
 
-			// put under job and name scope
-			JobID jid = new JobID();
-			String stringKey = "my test key";
-			client.put(jid, stringKey, data);
-
 			// --- GET the data and check that it is equal ---
 
 			// one get request on the same client
@@ -212,12 +169,6 @@ public class BlobServerPutTest extends TestLogger {
 			BlobUtils.readFully(is2, result2, 0, result2.length, null);
 			is2.close();
 			assertArrayEquals(data, result2);
-
-			InputStream is3 = client.get(jid, stringKey);
-			byte[] result3 = new byte[data.length];
-			BlobUtils.readFully(is3, result3, 0, result3.length, null);
-			is3.close();
-			assertArrayEquals(data, result3);
 		} finally {
 			if (client != null) {
 				client.close();
@@ -248,14 +199,6 @@ public class BlobServerPutTest extends TestLogger {
 			{
 				BlobKey key1 = client.put(new ByteArrayInputStream(data));
 				assertNotNull(key1);
-
-			}
-
-			// put under job and name scope
-			{
-				JobID jid = new JobID();
-				String stringKey = "my test key";
-				client.put(jid, stringKey, new ByteArrayInputStream(data));
 			}
 		} finally {
 			if (client != null) {
@@ -290,14 +233,6 @@ public class BlobServerPutTest extends TestLogger {
 			{
 				BlobKey key1 = client.put(new ChunkedInputStream(data, 19));
 				assertNotNull(key1);
-
-			}
-
-			// put under job and name scope
-			{
-				JobID jid = new JobID();
-				String stringKey = "my test key";
-				client.put(jid, stringKey, new ChunkedInputStream(data, 17));
 			}
 		} finally {
 			if (client != null) {
@@ -364,64 +299,6 @@ public class BlobServerPutTest extends TestLogger {
 		}
 	}
 
-	@Test
-	public void testPutNamedBufferFails() throws IOException {
-		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
-
-		BlobServer server = null;
-		BlobClient client = null;
-
-		File tempFileDir = null;
-		try {
-			Configuration config = new Configuration();
-			server = new BlobServer(config, new VoidBlobStore());
-
-			// make sure the blob server cannot create any files in its storage dir
-			tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
-			assertTrue(tempFileDir.setExecutable(true, false));
-			assertTrue(tempFileDir.setReadable(true, false));
-			assertTrue(tempFileDir.setWritable(false, false));
-
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			client = new BlobClient(serverAddress, config);
-
-			byte[] data = new byte[2000000];
-			rnd.nextBytes(data);
-
-			// put under job and name scope
-			try {
-				JobID jid = new JobID();
-				String stringKey = "my test key";
-				client.put(jid, stringKey, data);
-				fail("This should fail.");
-			}
-			catch (IOException e) {
-				assertTrue(e.getMessage(), e.getMessage().contains("Server side error"));
-			}
-
-			try {
-				JobID jid = new JobID();
-				String stringKey = "another key";
-				client.put(jid, stringKey, data);
-				fail("Client should be closed");
-			}
-			catch (IllegalStateException e) {
-				// expected
-			}
-		} finally {
-			// set writable again to make sure we can remove the directory
-			if (tempFileDir != null) {
-				tempFileDir.setWritable(true, false);
-			}
-			if (client != null) {
-				client.close();
-			}
-			if (server != null) {
-				server.close();
-			}
-		}
-	}
-
 	/**
 	 * FLINK-6020
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 73f5819..a727294 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -67,6 +67,8 @@ public class BlobLibraryCacheManagerTest {
 			buf[0] += 1;
 			keys.add(bc.put(buf));
 
+			bc.close();
+
 			long cleanupInterval = 1000l;
 			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
 			libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList());
@@ -96,13 +98,17 @@ public class BlobLibraryCacheManagerTest {
 			assertEquals(0, checkFilesExist(keys, libraryCacheManager, false));
 
 			try {
-				bc.get(jid, "test");
+				server.getURL(keys.get(0));
+				fail("name-addressable BLOB should have been deleted");
+			} catch (IOException e) {
+				// expected
+			}
+			try {
+				server.getURL(keys.get(1));
 				fail("name-addressable BLOB should have been deleted");
 			} catch (IOException e) {
 				// expected
 			}
-
-			bc.close();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -180,7 +186,6 @@ public class BlobLibraryCacheManagerTest {
 			keys.add(bc.put(buf));
 			buf[0] += 1;
 			keys.add(bc.put(buf));
-			bc.put(jid, "test", buf);
 
 			long cleanupInterval = 1000l;
 			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);


[4/4] flink git commit: [FLINK-6965] Include snappy-java in flink-dist

Posted by ch...@apache.org.
[FLINK-6965] Include snappy-java in flink-dist

This closes #4160.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/021d27d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/021d27d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/021d27d5

Branch: refs/heads/master
Commit: 021d27d54960bedd58b00e89f7898a34f3991336
Parents: ed3b326
Author: zentol <ch...@apache.org>
Authored: Wed Jun 21 16:18:34 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 11 18:27:06 2017 +0200

----------------------------------------------------------------------
 flink-core/pom.xml           | 13 ++++++-------
 pom.xml                      |  6 ++++++
 tools/travis_mvn_watchdog.sh |  9 ++++++++-
 3 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/021d27d5/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 5b42220..c6ba680 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -79,13 +79,12 @@ under the License.
 		<dependency>
 			<groupId>org.apache.avro</groupId>
 			<artifactId>avro</artifactId>
-			<!-- managed version -->
-			<exclusions>
-				<exclusion>
-					<groupId>org.xerial.snappy</groupId>
-					<artifactId>snappy-java</artifactId>
-				</exclusion>
-			</exclusions>
+		</dependency>
+
+		<!-- We explicitly depend on snappy since connectors that require it load it through the system class loader -->
+		<dependency>
+			<groupId>org.xerial.snappy</groupId>
+			<artifactId>snappy-java</artifactId>
 		</dependency>
 
 		<!-- ASM is needed for type extraction -->

http://git-wip-us.apache.org/repos/asf/flink/blob/021d27d5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 091769f..3de92c7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -267,6 +267,12 @@ under the License.
 				<version>1.8.2</version>
 			</dependency>
 
+			<dependency>
+				<groupId>org.xerial.snappy</groupId>
+				<artifactId>snappy-java</artifactId>
+				<version>1.1.1.3</version>
+			</dependency>
+
 			<!-- Make sure we use a consistent commons-cli version throughout the project -->
 			<dependency>
 				<groupId>commons-cli</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/021d27d5/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index f3c1699..b22bfe0 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -165,7 +165,7 @@ watchdog () {
 	done
 }
 
-# Check the final fat jar for illegal artifacts
+# Check the final fat jar for illegal or missing artifacts
 check_shaded_artifacts() {
 	jar tf build-target/lib/flink-dist*.jar > allClasses
 	ASM=`cat allClasses | grep '^org/objectweb/asm/' | wc -l`
@@ -184,6 +184,13 @@ check_shaded_artifacts() {
 		exit 1
 	fi
 
+	SNAPPY=`cat allClasses | grep '^org/xerial/snappy' | wc -l`
+	if [ $SNAPPY == "0" ]; then
+		echo "=============================================================================="
+		echo "Missing snappy dependencies in fat jar"
+		echo "=============================================================================="
+		exit 1
+	fi
 }
 
 # =============================================================================


[2/4] flink git commit: [FLINK-7103] [dispatcher] Add skeletal structure of Dispatcher component

Posted by ch...@apache.org.
[FLINK-7103] [dispatcher] Add skeletal structure of Dispatcher component

The Dispatcher is responsible for receiving job submissions, persisting the JobGraphs,
spawning JobManager to execute the jobs and recovering the jobs in case of a master
failure. This commit adds the basic skeleton including the RPC call for job submission.

Add cleanup logic for finished jobs

Pass BlobService to JobManagerRunner

This closes #4260.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/748eba1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/748eba1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/748eba1b

Branch: refs/heads/master
Commit: 748eba1b5363aeb9e64d379b9b25d5d997bec7ad
Parents: f68856a
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 4 23:15:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 11 18:19:25 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 294 +++++++++++++++++++
 .../runtime/dispatcher/DispatcherGateway.java   |  55 ++++
 .../dispatcher/StandaloneDispatcher.java        |  86 ++++++
 .../runtime/jobmanager/SubmittedJobGraph.java   |   6 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   6 +-
 .../runtime/jobmaster/JobManagerServices.java   |  11 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  17 ++
 .../minicluster/MiniClusterJobDispatcher.java   |  16 +-
 .../runtime/dispatcher/DispatcherTest.java      | 149 ++++++++++
 .../jobmaster/JobManagerRunnerMockTest.java     |   3 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |   7 +
 11 files changed, 639 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
new file mode 100644
index 0000000..e0ec049
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for the Dispatcher component. The Dispatcher component is responsible
+ * for receiving job submissions, persisting them, spawning JobManagers to execute
+ * the jobs and to recover them in case of a master failure. Furthermore, it knows
+ * about the state of the Flink session cluster.
+ */
+public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
+
+	public static final String DISPATCHER_NAME = "dispatcher";
+
+	private final Configuration configuration;
+
+	private final SubmittedJobGraphStore submittedJobGraphStore;
+	private final RunningJobsRegistry runningJobsRegistry;
+
+	private final HighAvailabilityServices highAvailabilityServices;
+	private final BlobServer blobServer;
+	private final HeartbeatServices heartbeatServices;
+	private final MetricRegistry metricRegistry;
+
+	private final FatalErrorHandler fatalErrorHandler;
+
+	private final Map<JobID, JobManagerRunner> jobManagerRunners;
+
+	protected Dispatcher(
+			RpcService rpcService,
+			String endpointId,
+			Configuration configuration,
+			HighAvailabilityServices highAvailabilityServices,
+			BlobServer blobServer,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		super(rpcService, endpointId);
+
+		this.configuration = Preconditions.checkNotNull(configuration);
+		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+		this.blobServer = Preconditions.checkNotNull(blobServer);
+		this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
+		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
+		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
+
+		this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
+		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
+
+		jobManagerRunners = new HashMap<>(16);
+	}
+
+	//------------------------------------------------------
+	// Lifecycle methods
+	//------------------------------------------------------
+
+	@Override
+	public void shutDown() throws Exception {
+		Exception exception = null;
+		// stop all currently running JobManagerRunners
+		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
+			jobManagerRunner.shutdown();
+		}
+
+		jobManagerRunners.clear();
+
+		try {
+			submittedJobGraphStore.stop();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		try {
+			super.shutDown();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		if (exception != null) {
+			throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
+		}
+	}
+
+	//------------------------------------------------------
+	// RPCs
+	//------------------------------------------------------
+
+	@RpcMethod
+	public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
+		final JobID jobId = jobGraph.getJobID();
+
+		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
+
+		final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
+
+		try {
+			jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
+		} catch (IOException e) {
+			log.warn("Cannot retrieve job status for {}.", jobId, e);
+			throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
+		}
+
+		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
+			try {
+				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
+			} catch (Exception e) {
+				log.warn("Cannot persist JobGraph.", e);
+				throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
+			}
+
+			final JobManagerRunner jobManagerRunner;
+
+			try {
+				jobManagerRunner = createJobManagerRunner(
+					ResourceID.generate(),
+					jobGraph,
+					configuration,
+					getRpcService(),
+					highAvailabilityServices,
+					blobServer,
+					heartbeatServices,
+					metricRegistry,
+					new DispatcherOnCompleteActions(jobGraph.getJobID()),
+					fatalErrorHandler);
+
+				jobManagerRunner.start();
+			} catch (Exception e) {
+				try {
+					// We should only remove a job from the submitted job graph store
+					// if the initial submission failed. Never in case of a recovery
+					submittedJobGraphStore.removeJobGraph(jobId);
+				} catch (Throwable t) {
+					log.warn("Cannot remove job graph from submitted job graph store.", t);
+					e.addSuppressed(t);
+				}
+
+				throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
+			}
+
+			jobManagerRunners.put(jobId, jobManagerRunner);
+
+			return Acknowledge.get();
+		} else {
+			throw new JobSubmissionException(jobId, "Job has already been submitted and " +
+				"is currently in state " + jobSchedulingStatus + '.');
+		}
+	}
+
+	@RpcMethod
+	public Collection<JobID> listJobs() {
+		// TODO: return proper list of running jobs
+		return jobManagerRunners.keySet();
+	}
+
+	/**
+	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
+	 * the data will also be removed from HA.
+	 *
+	 * @param jobId JobID identifying the job to clean up
+	 * @param cleanupHA True iff HA data shall also be cleaned up
+	 */
+	private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
+		JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
+
+		if (jobManagerRunner != null) {
+			jobManagerRunner.shutdown();
+		}
+
+		if (cleanupHA) {
+			submittedJobGraphStore.removeJobGraph(jobId);
+		}
+
+		// TODO: remove job related files from blob server
+	}
+
+	protected abstract JobManagerRunner createJobManagerRunner(
+		ResourceID resourceId,
+		JobGraph jobGraph,
+		Configuration configuration,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		BlobService blobService,
+		HeartbeatServices heartbeatServices,
+		MetricRegistry metricRegistry,
+		OnCompletionActions onCompleteActions,
+		FatalErrorHandler fatalErrorHandler) throws Exception;
+
+	//------------------------------------------------------
+	// Utility classes
+	//------------------------------------------------------
+
+	private class DispatcherOnCompleteActions implements OnCompletionActions {
+
+		private final JobID jobId;
+
+		private DispatcherOnCompleteActions(JobID jobId) {
+			this.jobId = Preconditions.checkNotNull(jobId);
+		}
+
+		@Override
+		public void jobFinished(JobExecutionResult result) {
+			log.info("Job {} finished.", jobId);
+
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						removeJob(jobId, true);
+					} catch (Exception e) {
+						log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+					}
+				}
+			});
+		}
+
+		@Override
+		public void jobFailed(Throwable cause) {
+			log.info("Job {} failed.", jobId);
+
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						removeJob(jobId, true);
+					} catch (Exception e) {
+						log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+					}
+				}
+			});
+		}
+
+		@Override
+		public void jobFinishedByOther() {
+			log.info("Job {} was finished by other JobManager.", jobId);
+
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						removeJob(jobId, false);
+					} catch (Exception e) {
+						log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+					}
+				}
+			});
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
new file mode 100644
index 0000000..c730bc1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+
+import java.util.Collection;
+
+/**
+ * Gateway for the Dispatcher component.
+ */
+public interface DispatcherGateway extends RpcGateway {
+
+	/**
+	 * Submit a job to the dispatcher.
+	 *
+	 * @param jobGraph JobGraph to submit
+	 * @param timeout RPC timeout
+	 * @return A future acknowledge if the submission succeeded
+	 */
+	Future<Acknowledge> submitJob(
+		JobGraph jobGraph,
+		@RpcTimeout Time timeout);
+
+	/**
+	 * Lists the current set of submitted jobs.
+	 *
+	 * @param timeout RPC timeout
+	 * @return A future collection of currently submitted jobs
+	 */
+	Future<Collection<JobID>> listJobs(
+		@RpcTimeout Time timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
new file mode 100644
index 0000000..6687657
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Dispatcher implementation which spawns a {@link JobMaster} for each
+ * submitted {@link JobGraph} within in the same process. This dispatcher
+ * can be used as the default for all different session clusters.
+ */
+public class StandaloneDispatcher extends Dispatcher {
+	protected StandaloneDispatcher(
+			RpcService rpcService,
+			String endpointId,
+			Configuration configuration,
+			HighAvailabilityServices highAvailabilityServices,
+			BlobServer blobServer,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		super(
+			rpcService,
+			endpointId,
+			configuration,
+			highAvailabilityServices,
+			blobServer,
+			heartbeatServices,
+			metricRegistry,
+			fatalErrorHandler);
+	}
+
+	@Override
+	protected JobManagerRunner createJobManagerRunner(
+			ResourceID resourceId,
+			JobGraph jobGraph,
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			BlobService blobService,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			OnCompletionActions onCompleteActions,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		// create the standard job manager runner
+		return new JobManagerRunner(
+			resourceId,
+			jobGraph,
+			configuration,
+			rpcService,
+			highAvailabilityServices,
+			blobService,
+			heartbeatServices,
+			metricRegistry,
+			onCompleteActions,
+			fatalErrorHandler);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
index e868da7..979f3d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -44,9 +46,9 @@ public class SubmittedJobGraph implements Serializable {
 	 * @param jobGraph The submitted {@link JobGraph}
 	 * @param jobInfo  The {@link JobInfo}
 	 */
-	public SubmittedJobGraph(JobGraph jobGraph, JobInfo jobInfo) {
+	public SubmittedJobGraph(JobGraph jobGraph, @Nullable JobInfo jobInfo) {
 		this.jobGraph = checkNotNull(jobGraph, "Job graph");
-		this.jobInfo = checkNotNull(jobInfo, "Job info");
+		this.jobInfo = jobInfo;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index d7fae35..d557f6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -92,6 +93,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
+			final BlobService blobService,
 			final HeartbeatServices heartbeatServices,
 			final OnCompletionActions toNotifyOnComplete,
 			final FatalErrorHandler errorHandler) throws Exception {
@@ -101,6 +103,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			configuration,
 			rpcService,
 			haServices,
+			blobService,
 			heartbeatServices,
 			new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
 			toNotifyOnComplete,
@@ -113,6 +116,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
+			final BlobService blobService,
 			final HeartbeatServices heartbeatServices,
 			final MetricRegistry metricRegistry,
 			final OnCompletionActions toNotifyOnComplete,
@@ -124,7 +128,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			rpcService,
 			haServices,
 			heartbeatServices,
-			JobManagerServices.fromConfiguration(configuration, haServices),
+			JobManagerServices.fromConfiguration(configuration, blobService),
 			metricRegistry,
 			toNotifyOnComplete,
 			errorHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index ac4d06f..e14f5af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -23,13 +23,13 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 
 import scala.concurrent.duration.FiniteDuration;
 
@@ -103,15 +103,16 @@ public class JobManagerServices {
 
 	public static JobManagerServices fromConfiguration(
 			Configuration config,
-			HighAvailabilityServices haServices) throws Exception {
+			BlobService blobService) throws Exception {
 
-		final BlobServer blobServer = new BlobServer(config, haServices.createBlobStore());
+		Preconditions.checkNotNull(config);
+		Preconditions.checkNotNull(blobService);
 
 		final long cleanupInterval = config.getLong(
 			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
 			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
 
-		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval);
+		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobService, cleanupInterval);
 
 		final FiniteDuration timeout;
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 6d3a3c4..8841f68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -86,6 +87,9 @@ public class MiniCluster {
 	private HighAvailabilityServices haServices;
 
 	@GuardedBy("lock")
+	private BlobServer blobServer;
+
+	@GuardedBy("lock")
 	private HeartbeatServices heartbeatServices;
 
 	@GuardedBy("lock")
@@ -241,6 +245,8 @@ public class MiniCluster {
 					configuration,
 					commonRpcService.getExecutor());
 
+				blobServer = new BlobServer(configuration, haServices.createBlobStore());
+
 				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
 
 				// bring up the ResourceManager(s)
@@ -263,6 +269,7 @@ public class MiniCluster {
 				jobDispatcher = new MiniClusterJobDispatcher(
 					configuration,
 					haServices,
+					blobServer,
 					heartbeatServices,
 					metricRegistry,
 					numJobManagers,
@@ -363,6 +370,16 @@ public class MiniCluster {
 		taskManagerRpcServices = null;
 		resourceManagerRpcServices = null;
 
+		// shut down the blob server
+		if (blobServer != null) {
+			try {
+				blobServer.close();
+			} catch (Exception e) {
+				exception = firstOrSuppressed(e, exception);
+			}
+			blobServer = null;
+		}
+
 		// shut down high-availability services
 		if (haServices != null) {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 1f8ae80..2bb94f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.minicluster;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -99,9 +100,17 @@ public class MiniClusterJobDispatcher {
 			Configuration config,
 			RpcService rpcService,
 			HighAvailabilityServices haServices,
+			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry) throws Exception {
-		this(config, haServices, heartbeatServices, metricRegistry, 1, new RpcService[] { rpcService });
+		this(
+			config,
+			haServices,
+			blobServer,
+			heartbeatServices,
+			metricRegistry,
+			1,
+			new RpcService[] { rpcService });
 	}
 
 	/**
@@ -119,6 +128,7 @@ public class MiniClusterJobDispatcher {
 	public MiniClusterJobDispatcher(
 			Configuration config,
 			HighAvailabilityServices haServices,
+			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			int numJobManagers,
@@ -135,7 +145,7 @@ public class MiniClusterJobDispatcher {
 		this.numJobManagers = numJobManagers;
 
 		LOG.info("Creating JobMaster services");
-		this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
+		this.jobManagerServices = JobManagerServices.fromConfiguration(config, blobServer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -285,6 +295,8 @@ public class MiniClusterJobDispatcher {
 		// to remove the data after job finished
 		try {
 			haServices.getRunningJobsRegistry().clearJob(jobID);
+
+			// TODO: Remove job data from BlobServer
 		}
 		catch (Throwable t) {
 			LOG.warn("Could not clear job {} at the status registry of the high-availability services", jobID, t);

http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
new file mode 100644
index 0000000..a7a86c3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the {@link Dispatcher} component.
+ */
+public class DispatcherTest extends TestLogger {
+
+	/**
+	 * Tests that we can submit a job to the Dispatcher which then spawns a
+	 * new JobManagerRunner.
+	 */
+	@Test
+	public void testJobSubmission() throws Exception {
+		TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+		RpcService rpcService = new TestingRpcService();
+		HighAvailabilityServices haServices = new StandaloneHaServices("localhost", "localhost");
+		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
+		JobManagerRunner jobManagerRunner = mock(JobManagerRunner.class);
+
+		final Time timeout = Time.seconds(5L);
+		final JobGraph jobGraph = mock(JobGraph.class);
+		final JobID jobId = new JobID();
+		when(jobGraph.getJobID()).thenReturn(jobId);
+
+		try {
+			final TestingDispatcher dispatcher = new TestingDispatcher(
+				rpcService,
+				Dispatcher.DISPATCHER_NAME,
+				new Configuration(),
+				haServices,
+				mock(BlobServer.class),
+				heartbeatServices,
+				mock(MetricRegistry.class),
+				fatalErrorHandler,
+				jobManagerRunner,
+				jobId);
+
+			dispatcher.start();
+
+			DispatcherGateway dispatcherGateway = dispatcher.getSelf();
+
+			Future<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+
+			acknowledgeFuture.get();
+
+			verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start();
+
+			// check that no error has occurred
+			fatalErrorHandler.rethrowError();
+		} finally {
+			rpcService.stopService();
+		}
+	}
+
+	private static class TestingDispatcher extends Dispatcher {
+
+		private final JobManagerRunner jobManagerRunner;
+		private final JobID expectedJobId;
+
+		protected TestingDispatcher(
+				RpcService rpcService,
+				String endpointId,
+				Configuration configuration,
+				HighAvailabilityServices highAvailabilityServices,
+				BlobServer blobServer,
+				HeartbeatServices heartbeatServices,
+				MetricRegistry metricRegistry,
+				FatalErrorHandler fatalErrorHandler,
+				JobManagerRunner jobManagerRunner,
+				JobID expectedJobId) throws Exception {
+			super(
+				rpcService,
+				endpointId,
+				configuration,
+				highAvailabilityServices,
+				blobServer,
+				heartbeatServices,
+				metricRegistry,
+				fatalErrorHandler);
+
+			this.jobManagerRunner = jobManagerRunner;
+			this.expectedJobId = expectedJobId;
+		}
+
+		@Override
+		protected JobManagerRunner createJobManagerRunner(
+				ResourceID resourceId,
+				JobGraph jobGraph,
+				Configuration configuration,
+				RpcService rpcService,
+				HighAvailabilityServices highAvailabilityServices,
+				BlobService blobService,
+				HeartbeatServices heartbeatServices,
+				MetricRegistry metricRegistry,
+				OnCompletionActions onCompleteActions,
+				FatalErrorHandler fatalErrorHandler) throws Exception {
+			assertEquals(expectedJobId, jobGraph.getJobID());
+
+			return jobManagerRunner;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 753c797..7c58879 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -111,7 +112,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 			mockRpc,
 			haServices,
 			heartbeatServices,
-			JobManagerServices.fromConfiguration(new Configuration(), haServices),
+			JobManagerServices.fromConfiguration(new Configuration(), mock(BlobServer.class)),
 			new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
 			jobCompletion,
 			jobCompletion));

http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 2ad9065..0107f80 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -99,6 +100,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	private RpcService commonRpcService;
 
 	@GuardedBy("lock")
+	private BlobServer blobServer;
+
+	@GuardedBy("lock")
 	private ResourceManager resourceManager;
 
 	@GuardedBy("lock")
@@ -144,6 +148,8 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 					commonRpcService.getExecutor(),
 					HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
+				blobServer = new BlobServer(config, haServices.createBlobStore());
+
 				heartbeatServices = HeartbeatServices.fromConfiguration(config);
 
 				metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -228,6 +234,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 			config,
 			commonRpcService,
 			haServices,
+			blobServer,
 			heartbeatServices,
 			this,
 			this);