You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/05/04 07:58:22 UTC

[2/2] flink git commit: [FLINK-8620] Enable shipping custom files to BlobStore and accessing them through DistributedCache

[FLINK-8620] Enable shipping custom files to BlobStore and accessing them through DistributedCache

This closes #5580


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0146f8a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0146f8a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0146f8a

Branch: refs/heads/master
Commit: e0146f8ac525e5e3e9e49fb590d5c149773efc9f
Parents: 2cfd89c
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Wed Feb 7 16:21:42 2018 +0100
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Fri May 4 09:56:44 2018 +0200

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  |   9 +-
 .../api/common/cache/DistributedCache.java      |  53 +++-
 .../java/org/apache/flink/util/FileUtils.java   |  38 +++
 .../pom.xml                                     |  85 ++++++
 .../DistributedCacheViaBlobTestProgram.java     |  77 +++++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/test-scripts/common.sh   |   2 +
 ...test_streaming_distributed_cache_via_blob.sh |  40 +++
 .../flink/python/api/PythonPlanBinder.java      |  12 +-
 .../python/api/PythonStreamBinder.java          |  15 +-
 .../environment/PythonEnvironmentFactory.java   |  16 +-
 .../PythonStreamExecutionEnvironment.java       |  36 +--
 .../plantranslate/JobGraphGenerator.java        |  15 +-
 .../webmonitor/handlers/JarRunHandler.java      |   4 +-
 .../apache/flink/runtime/blob/BlobClient.java   | 236 +++++-----------
 .../flink/runtime/blob/BlobOutputStream.java    | 177 ++++++++++++
 .../apache/flink/runtime/blob/BlobUtils.java    |  23 ++
 .../apache/flink/runtime/client/JobClient.java  |   7 +
 .../client/JobSubmissionClientActor.java        |  10 +
 .../flink/runtime/filecache/FileCache.java      | 282 +++++++++----------
 .../apache/flink/runtime/jobgraph/JobGraph.java |  55 +++-
 .../flink/runtime/minicluster/MiniCluster.java  |  16 +-
 .../runtime/taskexecutor/TaskExecutor.java      |   8 +-
 .../taskexecutor/TaskManagerServices.java       |  17 --
 .../apache/flink/runtime/taskmanager/Task.java  |  29 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  32 ++-
 .../flink/runtime/blob/BlobCachePutTest.java    |   2 +-
 .../flink/runtime/blob/BlobClientTest.java      |  52 +++-
 .../flink/runtime/blob/BlobServerPutTest.java   |   2 +-
 .../FileCacheDeleteValidationTest.java          | 145 ----------
 .../filecache/FileCacheDirectoriesTest.java     | 209 ++++++++++++++
 .../filecache/FileCacheReadsFromBlobTest.java   | 120 ++++++++
 .../jobmanager/JobManagerCleanupITCase.java     |   6 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   2 +-
 .../TaskManagerServicesBuilder.java             |   9 -
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   6 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |   3 +-
 .../tasks/TaskCheckpointingBehaviourTest.java   |   3 +-
 .../streaming/util/TestStreamEnvironment.java   |   8 +-
 40 files changed, 1224 insertions(+), 640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 5c04be7..0921642 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -319,18 +319,19 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 		CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
 			getDispatcherAddress(),
 			(BlobServerPortResponseBody response, String dispatcherAddress) -> {
-				log.info("Uploading jar files.");
 				final int blobServerPort = response.port;
 				final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
 				final List<PermanentBlobKey> keys;
 				try {
-					keys = BlobClient.uploadJarFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
+					log.info("Uploading jar files.");
+					keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
+					jobGraph.uploadUserArtifacts(address, flinkConfig);
 				} catch (IOException ioe) {
-					throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+					throw new CompletionException(new FlinkException("Could not upload job files.", ioe));
 				}
 
 				for (PermanentBlobKey key : keys) {
-					jobGraph.addBlob(key);
+					jobGraph.addUserJarBlobKey(key);
 				}
 
 				return jobGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
index 35a82e8..9cceb83 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
@@ -19,6 +19,8 @@ package org.apache.flink.api.common.cache;
 
 
 import java.io.File;
+import java.io.Serializable;
+import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -37,18 +39,31 @@ import org.apache.flink.core.fs.Path;
  */
 @Public
 public class DistributedCache {
-	
-	public static class DistributedCacheEntry {
-		
+
+	public static class DistributedCacheEntry implements Serializable {
+
 		public String filePath;
 		public Boolean isExecutable;
-		
-		public DistributedCacheEntry(String filePath, Boolean isExecutable){
+		public boolean isZipped;
+
+		public byte[] blobKey;
+
+		public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobKey, boolean isZipped){
 			this.filePath=filePath;
 			this.isExecutable=isExecutable;
+			this.blobKey = blobKey;
+			this.isZipped = isZipped;
+		}
+
+		public DistributedCacheEntry(String filePath, Boolean isExecutable){
+			this(filePath, isExecutable, null);
+		}
+
+		public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobKey){
+			this(filePath, isExecutable, blobKey, false);
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	private final Map<String, Future<Path>> cacheCopyTasks;
@@ -63,16 +78,17 @@ public class DistributedCache {
 		if (name == null) {
 			throw new NullPointerException("name must not be null");
 		}
-		
+
 		Future<Path> future = cacheCopyTasks.get(name);
 		if (future == null) {
 			throw new IllegalArgumentException("File with name '" + name + "' is not available." +
 					" Did you forget to register the file?");
 		}
-		
+
 		try {
-			Path tmp = future.get();
-			return new File(tmp.toString());
+			final Path path = future.get();
+			URI tmp = path.makeQualified(path.getFileSystem()).toUri();
+			return new File(tmp);
 		}
 		catch (ExecutionException e) {
 			throw new RuntimeException("An error occurred while copying the file.", e.getCause());
@@ -82,17 +98,19 @@ public class DistributedCache {
 					"' from the distributed cache", e);
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Utilities to read/write cache files from/to the configuration
 	// ------------------------------------------------------------------------
-	
+
 	public static void writeFileInfoToConfig(String name, DistributedCacheEntry e, Configuration conf) {
 		int num = conf.getInteger(CACHE_FILE_NUM,0) + 1;
 		conf.setInteger(CACHE_FILE_NUM, num);
 		conf.setString(CACHE_FILE_NAME + num, name);
 		conf.setString(CACHE_FILE_PATH + num, e.filePath);
 		conf.setBoolean(CACHE_FILE_EXE + num, e.isExecutable || new File(e.filePath).canExecute());
+		conf.setBoolean(CACHE_FILE_DIR + num, e.isZipped || new File(e.filePath).isDirectory());
+		conf.setBytes(CACHE_FILE_BLOB_KEY + num, e.blobKey);
 	}
 
 	public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(Configuration conf) {
@@ -105,8 +123,11 @@ public class DistributedCache {
 		for (int i = 1; i <= num; i++) {
 			String name = conf.getString(CACHE_FILE_NAME + i, null);
 			String filePath = conf.getString(CACHE_FILE_PATH + i, null);
-			Boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false);
-			cacheFiles.put(name, new DistributedCacheEntry(filePath, isExecutable));
+			boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false);
+			boolean isDirectory = conf.getBoolean(CACHE_FILE_DIR + i, false);
+
+			byte[] blobKey = conf.getBytes(CACHE_FILE_BLOB_KEY + i, null);
+			cacheFiles.put(name, new DistributedCacheEntry(filePath, isExecutable, blobKey, isDirectory));
 		}
 		return cacheFiles.entrySet();
 	}
@@ -118,4 +139,8 @@ public class DistributedCache {
 	private static final String CACHE_FILE_PATH = "DISTRIBUTED_CACHE_FILE_PATH_";
 
 	private static final String CACHE_FILE_EXE = "DISTRIBUTED_CACHE_FILE_EXE_";
+
+	private static final String CACHE_FILE_DIR = "DISTRIBUTED_CACHE_FILE_DIR_";
+
+	private static final String CACHE_FILE_BLOB_KEY = "DISTRIBUTED_CACHE_FILE_BLOB_KEY_";
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index f47825c..46c5621 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.util;
 
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -315,6 +317,42 @@ public final class FileUtils {
 		}
 	}
 
+	/**
+	 * Copies all files from source to target and sets executable flag. Paths might be on different systems.
+	 * @param sourcePath source path to copy from
+	 * @param targetPath target path to copy to
+	 * @param executable if target file should be executable
+	 * @throws IOException if the copy fails
+	 */
+	public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
+		// we unwrap the file system to get raw streams without safety net
+		FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri());
+		FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri());
+		if (!tFS.exists(targetPath)) {
+			if (sFS.getFileStatus(sourcePath).isDir()) {
+				tFS.mkdirs(targetPath);
+				FileStatus[] contents = sFS.listStatus(sourcePath);
+				for (FileStatus content : contents) {
+					String distPath = content.getPath().toString();
+					if (content.isDir()) {
+						if (distPath.endsWith("/")) {
+							distPath = distPath.substring(0, distPath.length() - 1);
+						}
+					}
+					String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
+					copy(content.getPath(), new Path(localPath), executable);
+				}
+			} else {
+				try (FSDataOutputStream lfsOutput = tFS.create(targetPath, FileSystem.WriteMode.NO_OVERWRITE); FSDataInputStream fsInput = sFS.open(sourcePath)) {
+					IOUtils.copyBytes(fsInput, lfsOutput);
+					//noinspection ResultOfMethodCallIgnored
+					new File(targetPath.toString()).setExecutable(executable);
+				} catch (IOException ignored) {
+				}
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml
new file mode 100644
index 0000000..046d58d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+    -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.6-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-distributed-cache-via-blob-test_${scala.binary.version}</artifactId>
+	<name>flink-distributed-cache-via-blob</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.4</version>
+
+				<executions>
+					<!-- ClassLoaderTestProgram -->
+					<execution>
+						<id>DistributedCacheViaBlobTestProgram</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<finalName>DistributedCacheViaBlobTestProgram</finalName>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.tests.DistributedCacheViaBlobTestProgram</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram*</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
new file mode 100644
index 0000000..9ad4b98
--- /dev/null
+++ b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+
+/**
+ * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool params = ParameterTool.fromArgs(args);
+
+		final Path inputFile = Paths.get(params.getRequired("inputFile"));
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		env.registerCachedFile(inputFile.toString(), "test_data", false);
+
+		env.fromElements(1)
+			.map(new TestMapFunction(inputFile.toAbsolutePath().toString()))
+			.writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
+
+		env.execute("Distributed Cache Via Blob Test Program");
+	}
+
+	static class TestMapFunction extends RichMapFunction<Integer, String> {
+
+		private String initialPath;
+
+		public TestMapFunction(String initialPath) {
+			this.initialPath = initialPath;
+		}
+
+		@Override
+		public String map(Integer value) throws Exception {
+			final Path testFile = getRuntimeContext().getDistributedCache().getFile("test_data").toPath();
+
+			if (testFile.toAbsolutePath().toString().equals(initialPath)) {
+				throw new RuntimeException(String.format("Operator should access copy from cache rather than the " +
+					"initial file. Input file path: %s. Cache file path: %s", initialPath, testFile));
+			}
+
+			return Files.readAllLines(testFile)
+				.stream()
+				.collect(Collectors.joining("\n"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 34586ed..6a03998 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -40,6 +40,7 @@ under the License.
 		<module>flink-datastream-allround-test</module>
 		<module>flink-stream-sql-test</module>
 		<module>flink-bucketing-sink-test</module>
+		<module>flink-distributed-cache-via-blob-test</module>
 	</modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 73885e1..d8e0de7 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -194,6 +194,8 @@ function stop_cluster {
       | grep -v "Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration" \
       | grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException" \
       | grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration" \
+      | grep -v "java.lang.Exception: Execution was suspended" \
+      | grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
       | grep -iq "exception"; then
     echo "Found exception in log files:"
     cat $FLINK_DIR/log/*

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh b/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh
new file mode 100755
index 0000000..b85f610
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh
@@ -0,0 +1,40 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-distributed-cache-via-blob-test/target/DistributedCacheViaBlobTestProgram.jar
+
+echo "Testing distributing files via DistributedCache & BlobServer"
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+
+$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --inputFile $TEST_INFRA_DIR/test-data/words --tempDir $TEST_DATA_DIR/ --output $TEST_DATA_DIR/out/cl_out_pf
+
+OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_pf`
+
+EXPECTED="Hello World how are you, my dear dear world"
+if [[ "$OUTPUT" != "$EXPECTED" ]]; then
+  echo "Output from Flink program does not match expected output."
+  echo -e "EXPECTED: $EXPECTED"
+  echo -e "ACTUAL: $OUTPUT"
+  PASS=""
+fi

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 106ac7a..a4f38fe 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -45,7 +45,7 @@ import org.apache.flink.python.api.functions.util.StringDeserializerMap;
 import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 import org.apache.flink.python.api.util.SetCache;
-import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 
 import org.slf4j.Logger;
@@ -182,10 +182,7 @@ public class PythonPlanBinder {
 
 				receivePlan(env);
 
-				// upload files to remote FS and register on Distributed Cache
-				deleteIfExists(tmpDistributedDir);
-				FileCache.copy(tmpPlanFilesPath, tmpDistributedDir, true);
-				env.registerCachedFile(tmpDistributedDir.toUri().toString(), FLINK_PYTHON_DC_ID);
+				env.registerCachedFile(tmpPlanFilesPath.toUri().toString(), FLINK_PYTHON_DC_ID, true);
 
 				JobExecutionResult jer = env.execute();
 				long runtime = jer.getNetRuntime();
@@ -197,9 +194,6 @@ public class PythonPlanBinder {
 		} finally {
 			try {
 				// clean up created files
-				FileSystem distributedFS = tmpDistributedDir.getFileSystem();
-				distributedFS.delete(tmpDistributedDir, true);
-
 				FileSystem local = FileSystem.getLocalFileSystem();
 				local.delete(new Path(tmpPlanFilesDir), true);
 			} catch (IOException ioe) {
@@ -252,7 +246,7 @@ public class PythonPlanBinder {
 	private static void copyFile(Path source, Path targetDirectory, String name) throws IOException {
 		Path targetFilePath = new Path(targetDirectory, name);
 		deleteIfExists(targetFilePath);
-		FileCache.copy(source, targetFilePath, true);
+		FileUtils.copy(source, targetFilePath, true);
 	}
 
 	//====Plan==========================================================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
index 97b1030..0742eac 100644
--- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
+++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
@@ -22,10 +22,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.streaming.python.PythonOptions;
 import org.apache.flink.streaming.python.api.environment.PythonEnvironmentFactory;
 import org.apache.flink.streaming.python.util.InterpreterUtils;
+import org.apache.flink.util.FileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,23 +110,19 @@ public class PythonStreamBinder {
 			targetDir.getFileSystem().mkdirs(targetDir);
 
 			// copy user files to temporary location
-			Path tmpPlanFilesPath = new Path(localTmpPath);
-			copyFile(planPath, tmpPlanFilesPath, planPath.getName());
+			copyFile(planPath, targetDir, planPath.getName());
 			for (String file : filesToCopy) {
 				Path source = new Path(file);
-				copyFile(source, tmpPlanFilesPath, source.getName());
+				copyFile(source, targetDir, source.getName());
 			}
 
 			String planNameWithExtension = planPath.getName();
 			String planName = planNameWithExtension.substring(0, planNameWithExtension.indexOf(".py"));
 
-			InterpreterUtils.initAndExecPythonScript(new PythonEnvironmentFactory(localTmpPath, tmpDistributedDir, planName), Paths.get(localTmpPath), planName, planArgumentsArray);
+			InterpreterUtils.initAndExecPythonScript(new PythonEnvironmentFactory(localTmpPath, planName), Paths.get(localTmpPath), planName, planArgumentsArray);
 		} finally {
 			try {
 				// clean up created files
-				FileSystem distributedFS = tmpDistributedDir.getFileSystem();
-				distributedFS.delete(tmpDistributedDir, true);
-
 				FileSystem local = FileSystem.getLocalFileSystem();
 				local.delete(new Path(localTmpPath), true);
 			} catch (IOException ioe) {
@@ -147,6 +143,7 @@ public class PythonStreamBinder {
 	private static void copyFile(Path source, Path targetDirectory, String name) throws IOException {
 		Path targetFilePath = new Path(targetDirectory, name);
 		deleteIfExists(targetFilePath);
-		FileCache.copy(source, targetFilePath, true);
+		FileUtils.copy(source, targetFilePath, true);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentFactory.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentFactory.java
index 8e437fe..b9b9f8c 100644
--- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentFactory.java
+++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentFactory.java
@@ -33,12 +33,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  */
 public class PythonEnvironmentFactory {
 	private final String localTmpPath;
-	private final Path tmpDistributedDir;
 	private final String scriptName;
 
-	public PythonEnvironmentFactory(String localTmpPath, Path tmpDistributedDir, String scriptName) {
+	public PythonEnvironmentFactory(String localTmpPath, String scriptName) {
 		this.localTmpPath = localTmpPath;
-		this.tmpDistributedDir = tmpDistributedDir;
 		this.scriptName = scriptName;
 	}
 
@@ -50,7 +48,7 @@ public class PythonEnvironmentFactory {
 	 * executed.
 	 */
 	public PythonStreamExecutionEnvironment get_execution_environment() {
-		return new PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment(), new Path(localTmpPath), tmpDistributedDir, scriptName);
+		return new PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment(), new Path(localTmpPath), scriptName);
 	}
 
 	/**
@@ -64,7 +62,7 @@ public class PythonEnvironmentFactory {
 	 * @return A local execution environment with the specified parallelism.
 	 */
 	public PythonStreamExecutionEnvironment create_local_execution_environment(Configuration config) {
-		return new PythonStreamExecutionEnvironment(new LegacyLocalStreamEnvironment(config), new Path(localTmpPath), tmpDistributedDir, scriptName);
+		return new PythonStreamExecutionEnvironment(new LegacyLocalStreamEnvironment(config), new Path(localTmpPath), scriptName);
 	}
 
 	/**
@@ -76,7 +74,7 @@ public class PythonEnvironmentFactory {
 	 */
 	public PythonStreamExecutionEnvironment create_local_execution_environment(int parallelism, Configuration config) {
 		return new PythonStreamExecutionEnvironment(
-			StreamExecutionEnvironment.createLocalEnvironment(parallelism, config), new Path(localTmpPath), tmpDistributedDir, scriptName);
+			StreamExecutionEnvironment.createLocalEnvironment(parallelism, config), new Path(localTmpPath), scriptName);
 	}
 
 	/**
@@ -95,7 +93,7 @@ public class PythonEnvironmentFactory {
 	public PythonStreamExecutionEnvironment create_remote_execution_environment(
 		String host, int port, String... jar_files) {
 		return new PythonStreamExecutionEnvironment(
-			StreamExecutionEnvironment.createRemoteEnvironment(host, port, jar_files), new Path(localTmpPath), tmpDistributedDir, scriptName);
+			StreamExecutionEnvironment.createRemoteEnvironment(host, port, jar_files), new Path(localTmpPath), scriptName);
 	}
 
 	/**
@@ -116,7 +114,7 @@ public class PythonEnvironmentFactory {
 	public PythonStreamExecutionEnvironment create_remote_execution_environment(
 		String host, int port, Configuration config, String... jar_files) {
 		return new PythonStreamExecutionEnvironment(
-			StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jar_files), new Path(localTmpPath), tmpDistributedDir, scriptName);
+			StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jar_files), new Path(localTmpPath), scriptName);
 	}
 
 	/**
@@ -137,6 +135,6 @@ public class PythonEnvironmentFactory {
 	public PythonStreamExecutionEnvironment create_remote_execution_environment(
 		String host, int port, int parallelism, String... jar_files) {
 		return new PythonStreamExecutionEnvironment(
-			StreamExecutionEnvironment.createRemoteEnvironment(host, port, parallelism, jar_files), new Path(localTmpPath), tmpDistributedDir, scriptName);
+			StreamExecutionEnvironment.createRemoteEnvironment(host, port, parallelism, jar_files), new Path(localTmpPath), scriptName);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
index 0684e35..e49c617 100644
--- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
+++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
@@ -21,17 +21,11 @@ package org.apache.flink.streaming.python.api.environment;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.python.PythonOptions;
 import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
 import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
 import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
@@ -61,7 +55,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.UUID;
 
 /**
  * A thin wrapper layer over {@link StreamExecutionEnvironment}.
@@ -78,12 +71,10 @@ public class PythonStreamExecutionEnvironment {
 	private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
 	private final StreamExecutionEnvironment env;
 	private final Path pythonTmpCachePath;
-	private final Path tmpDistributedDir;
 
-	PythonStreamExecutionEnvironment(StreamExecutionEnvironment env, Path tmpLocalDir, Path tmpDistributedDir, String scriptName) {
+	PythonStreamExecutionEnvironment(StreamExecutionEnvironment env, Path tmpLocalDir, String scriptName) {
 		this.env = env;
 		this.pythonTmpCachePath = tmpLocalDir;
-		this.tmpDistributedDir = tmpDistributedDir;
 		env.getConfig().setGlobalJobParameters(new PythonJobParameters(scriptName));
 		registerJythonSerializers(this.env);
 	}
@@ -252,7 +243,6 @@ public class PythonStreamExecutionEnvironment {
 	public JobExecutionResult execute() throws Exception {
 		distributeFiles();
 		JobExecutionResult result = this.env.execute();
-		cleanupDistributedFiles();
 		return result;
 	}
 
@@ -265,33 +255,11 @@ public class PythonStreamExecutionEnvironment {
 	public JobExecutionResult execute(String job_name) throws Exception {
 		distributeFiles();
 		JobExecutionResult result = this.env.execute(job_name);
-		cleanupDistributedFiles();
 		return result;
 	}
 
 	private void distributeFiles() throws IOException {
-		Path remoteRootDir;
-		if (this.env instanceof LocalStreamEnvironment) {
-			remoteRootDir = new Path(PythonOptions.DC_TMP_DIR.defaultValue());
-		} else {
-			remoteRootDir = tmpDistributedDir;
-		}
-		Path remoteDir = new Path(remoteRootDir, "flink_cache_" + UUID.randomUUID());
-
-		FileCache.copy(pythonTmpCachePath, remoteDir, true);
-
-		this.env.registerCachedFile(remoteDir.toUri().toString(), PythonConstants.FLINK_PYTHON_DC_ID);
+		this.env.registerCachedFile(pythonTmpCachePath.getPath(), PythonConstants.FLINK_PYTHON_DC_ID);
 	}
 
-	private void cleanupDistributedFiles() throws IOException {
-		for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : this.env.getCachedFiles()) {
-			Path fileUri = new Path(e.f1.filePath);
-			FileSystem fs = fileUri.getFileSystem();
-			LOG.debug(String.format("Cleaning up cached path: %s, uriPath: %s, fileSystem: %s",
-				e.f1.filePath,
-				fileUri.getPath(),
-				fs.getClass().getName()));
-			fs.delete(new Path(fileUri.getPath()), true);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 846e7f4..0c0c3f3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -18,19 +18,18 @@
 
 package org.apache.flink.optimizer.plantranslate;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.dag.TempMode;
@@ -49,8 +48,6 @@ import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.util.Utils;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -59,11 +56,11 @@ import org.apache.flink.runtime.iterative.task.IterationHeadTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediateTask;
 import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
 import org.apache.flink.runtime.iterative.task.IterationTailTask;
-import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.InputFormatVertex;
+import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.BatchTask;
@@ -84,6 +81,8 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.Visitor;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -246,7 +245,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 
 		// add registered cache file into job configuration
 		for (Entry<String, DistributedCacheEntry> e : program.getOriginalPlan().getCachedFiles()) {
-			DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration());
+			graph.addUserArtifact(e.getKey(), e.getValue());
 		}
 
 		// release all references again

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 2e928b0..0605bf1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -116,13 +116,13 @@ public class JarRunHandler extends
 			final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
 			final List<PermanentBlobKey> keys;
 			try {
-				keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+				keys = BlobClient.uploadFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
 			} catch (IOException ioe) {
 				throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
 			}
 
 			for (PermanentBlobKey key : keys) {
-				jobGraph.addBlob(key);
+				jobGraph.addUserJarBlobKey(key);
 			}
 
 			return jobGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 8e6b328..1301740 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -22,10 +22,11 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.IOUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,23 +46,20 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.security.MessageDigest;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobUtils.readFully;
-import static org.apache.flink.runtime.blob.BlobUtils.readLength;
-import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+import static org.apache.flink.runtime.blob.BlobUtils.readExceptionFromStream;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -348,38 +346,11 @@ public final class BlobClient implements Closeable {
 			LOG.debug("PUT BLOB buffer (" + len + " bytes) to " + socket.getLocalSocketAddress() + ".");
 		}
 
-		try {
-			final OutputStream os = this.socket.getOutputStream();
-			final MessageDigest md = BlobUtils.createMessageDigest();
-
-			// Send the PUT header
-			sendPutHeader(os, jobId, blobType);
-
-			// Send the value in iterations of BUFFER_SIZE
-			int remainingBytes = len;
-
-			while (remainingBytes > 0) {
-				// want a common code path for byte[] and InputStream at the BlobServer
-				// -> since for InputStream we don't know a total size beforehand, send lengths iteratively
-				final int bytesToSend = Math.min(BUFFER_SIZE, remainingBytes);
-				writeLength(bytesToSend, os);
-
-				os.write(value, offset, bytesToSend);
-
-				// Update the message digest
-				md.update(value, offset, bytesToSend);
-
-				remainingBytes -= bytesToSend;
-				offset += bytesToSend;
-			}
-			// send -1 as the stream end
-			writeLength(-1, os);
-
+		try (BlobOutputStream os = new BlobOutputStream(jobId, blobType, socket)) {
+			os.write(value, offset, len);
 			// Receive blob key and compare
-			final InputStream is = this.socket.getInputStream();
-			return receiveAndCheckPutResponse(is, md, blobType);
-		}
-		catch (Throwable t) {
+			return os.finish();
+		} catch (Throwable t) {
 			BlobUtils.closeSilently(socket, LOG);
 			throw new IOException("PUT operation failed: " + t.getMessage(), t);
 		}
@@ -413,111 +384,16 @@ public final class BlobClient implements Closeable {
 			LOG.debug("PUT BLOB stream to {}.", socket.getLocalSocketAddress());
 		}
 
-		try {
-			final OutputStream os = this.socket.getOutputStream();
-			final MessageDigest md = BlobUtils.createMessageDigest();
-			final byte[] xferBuf = new byte[BUFFER_SIZE];
-
-			// Send the PUT header
-			sendPutHeader(os, jobId, blobType);
-
-			while (true) {
-				// since we don't know a total size here, send lengths iteratively
-				final int read = inputStream.read(xferBuf);
-				if (read < 0) {
-					// we are done. send a -1 and be done
-					writeLength(-1, os);
-					break;
-				}
-				if (read > 0) {
-					writeLength(read, os);
-					os.write(xferBuf, 0, read);
-					md.update(xferBuf, 0, read);
-				}
-			}
-
-			// Receive blob key and compare
-			final InputStream is = this.socket.getInputStream();
-			return receiveAndCheckPutResponse(is, md, blobType);
-		}
-		catch (Throwable t) {
+		try (BlobOutputStream os = new BlobOutputStream(jobId, blobType, socket)) {
+			IOUtils.copyBytes(inputStream, os, BUFFER_SIZE, false);
+			return os.finish();
+		} catch (Throwable t) {
 			BlobUtils.closeSilently(socket, LOG);
 			throw new IOException("PUT operation failed: " + t.getMessage(), t);
 		}
 	}
 
 	/**
-	 * Constructs and writes the header data for a PUT request to the given output stream.
-	 *
-	 * @param outputStream
-	 * 		the output stream to write the PUT header data to
-	 * @param jobId
-	 * 		the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
-	 * @param blobType
-	 * 		whether the BLOB should become permanent or transient
-	 *
-	 * @throws IOException
-	 * 		thrown if an I/O error occurs while writing the header data to the output stream
-	 */
-	private static void sendPutHeader(
-			OutputStream outputStream, @Nullable JobID jobId, BlobKey.BlobType blobType)
-			throws IOException {
-		// Signal type of operation
-		outputStream.write(PUT_OPERATION);
-		if (jobId == null) {
-			outputStream.write(JOB_UNRELATED_CONTENT);
-		} else {
-			outputStream.write(JOB_RELATED_CONTENT);
-			outputStream.write(jobId.getBytes());
-		}
-		outputStream.write(blobType.ordinal());
-	}
-
-	/**
-	 * Reads the response from the input stream and throws in case of errors.
-	 *
-	 * @param is
-	 * 		stream to read from
-	 * @param md
-	 * 		message digest to check the response against
-	 * @param blobType
-	 * 		whether the BLOB should be permanent or transient
-	 *
-	 * @throws IOException
-	 * 		if the response is an error, the message digest does not match or reading the response
-	 * 		failed
-	 */
-	private static BlobKey receiveAndCheckPutResponse(
-			InputStream is, MessageDigest md, BlobKey.BlobType blobType)
-			throws IOException {
-		int response = is.read();
-		if (response < 0) {
-			throw new EOFException("Premature end of response");
-		}
-		else if (response == RETURN_OKAY) {
-
-			BlobKey remoteKey = BlobKey.readFromInputStream(is);
-			byte[] localHash = md.digest();
-
-			if (blobType != remoteKey.getType()) {
-				throw new IOException("Detected data corruption during transfer");
-			}
-			if (!Arrays.equals(localHash, remoteKey.getHash())) {
-				throw new IOException("Detected data corruption during transfer");
-			}
-
-			return remoteKey;
-		}
-		else if (response == RETURN_ERROR) {
-			Throwable cause = readExceptionFromStream(is);
-			throw new IOException("Server side error: " + cause.getMessage(), cause);
-		}
-		else {
-			throw new IOException("Unrecognized response: " + response + '.');
-		}
-	}
-
-	/**
 	 * Uploads the JAR files to the {@link PermanentBlobService} of the {@link BlobServer} at the
 	 * given address with HA as configured.
 	 *
@@ -527,37 +403,27 @@ public final class BlobClient implements Closeable {
 	 * 		Any additional configuration for the blob client
 	 * @param jobId
 	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
-	 * @param jars
-	 * 		List of JAR files to upload
+	 * @param files
+	 * 		List of files to upload
 	 *
 	 * @throws IOException
 	 * 		if the upload fails
 	 */
-	public static List<PermanentBlobKey> uploadJarFiles(
-			InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> jars)
+	public static List<PermanentBlobKey> uploadFiles(
+			InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> files)
 			throws IOException {
 
 		checkNotNull(jobId);
 
-		if (jars.isEmpty()) {
+		if (files.isEmpty()) {
 			return Collections.emptyList();
 		} else {
 			List<PermanentBlobKey> blobKeys = new ArrayList<>();
 
 			try (BlobClient blobClient = new BlobClient(serverAddress, clientConfig)) {
-				for (final Path jar : jars) {
-					final FileSystem fs = jar.getFileSystem();
-					FSDataInputStream is = null;
-					try {
-						is = fs.open(jar);
-						final PermanentBlobKey key =
-							(PermanentBlobKey) blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
-						blobKeys.add(key);
-					} finally {
-						if (is != null) {
-							is.close();
-						}
-					}
+				for (final Path file : files) {
+					final PermanentBlobKey key = blobClient.uploadFile(jobId, file);
+					blobKeys.add(key);
 				}
 			}
 
@@ -565,21 +431,55 @@ public final class BlobClient implements Closeable {
 		}
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//  Miscellaneous
-	// --------------------------------------------------------------------------------------------
-
-	private static Throwable readExceptionFromStream(InputStream in) throws IOException {
-		int len = readLength(in);
-		byte[] bytes = new byte[len];
-		readFully(in, bytes, 0, len, "Error message");
+	/**
+	 * Uploads a single file to the {@link PermanentBlobService} of the given {@link BlobServer}.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param file
+	 * 		file to upload
+	 *
+	 * @throws IOException
+	 * 		if the upload fails
+	 */
+	public PermanentBlobKey uploadFile(JobID jobId, Path file) throws IOException {
+		final FileSystem fs = file.getFileSystem();
+		if (fs.getFileStatus(file).isDir()) {
+			return uploadDirectory(jobId, file, fs);
+		} else {
+			try (InputStream is = fs.open(file)) {
+				return (PermanentBlobKey) putInputStream(jobId, is, PERMANENT_BLOB);
+			}
+		}
+	}
 
-		try {
-			return (Throwable) InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+	private PermanentBlobKey uploadDirectory(JobID jobId, Path file, FileSystem fs) throws IOException {
+		try (BlobOutputStream blobOutputStream = new BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
+			try (ZipOutputStream zipStream = new ZipOutputStream(blobOutputStream)) {
+				compressDirectoryToZipfile(fs, fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
+				zipStream.finish();
+				return (PermanentBlobKey) blobOutputStream.finish();
+			}
 		}
-		catch (ClassNotFoundException e) {
-			// should never occur
-			throw new IOException("Could not transfer error message", e);
+	}
+
+	private static void compressDirectoryToZipfile(FileSystem fs, FileStatus rootDir, FileStatus sourceDir, ZipOutputStream out) throws IOException {
+		for (FileStatus file : fs.listStatus(sourceDir.getPath())) {
+			LOG.info("Zipping file: {}", file);
+			if (file.isDir()) {
+				compressDirectoryToZipfile(fs, rootDir, file, out);
+			} else {
+				String entryName = file.getPath().getPath().replace(rootDir.getPath().getPath(), "");
+				LOG.info("Zipping entry: {}, file: {}, rootDir: {}", entryName, file, rootDir);
+				ZipEntry entry = new ZipEntry(entryName);
+				out.putNextEntry(entry);
+
+				try (FSDataInputStream in = fs.open(file.getPath())) {
+					IOUtils.copyBytes(in, out, false);
+				}
+				out.closeEntry();
+			}
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobOutputStream.java
new file mode 100644
index 0000000..923b5c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobOutputStream.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.security.MessageDigest;
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
+import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+
+/**
+ * The BLOB output stream is a special implementation of an {@link OutputStream} to send data vi PUT to the BLOB server.
+ */
+final class BlobOutputStream extends OutputStream {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BlobOutputStream.class);
+
+	private final BlobKey.BlobType blobType;
+	private final OutputStream socketStream;
+	private final Socket socket;
+	private final MessageDigest md;
+
+	BlobOutputStream(JobID jobID, BlobKey.BlobType blobType, Socket socket) throws IOException {
+		this.blobType = blobType;
+
+		if (socket.isClosed()) {
+			throw new IllegalStateException("BLOB Client is not connected. " +
+				"Client has been shut down or encountered an error before.");
+		}
+
+		this.socket = socket;
+		this.socketStream = socket.getOutputStream();
+		this.md = BlobUtils.createMessageDigest();
+		sendPutHeader(socketStream, jobID, blobType);
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		writeLength(1, socketStream);
+		socketStream.write(b);
+		md.update((byte) b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		// Send the value in iterations of BUFFER_SIZE
+		int remainingBytes = len;
+
+		while (remainingBytes > 0) {
+			// want a common code path for byte[] and InputStream at the BlobServer
+			// -> since for InputStream we don't know a total size beforehand, send lengths iteratively
+			final int bytesToSend = Math.min(BUFFER_SIZE, remainingBytes);
+			writeLength(bytesToSend, socketStream);
+
+			socketStream.write(b, off, bytesToSend);
+
+			// Update the message digest
+			md.update(b, off, bytesToSend);
+
+			remainingBytes -= bytesToSend;
+			off += bytesToSend;
+		}
+	}
+
+	public BlobKey finish() throws IOException {
+		// send -1 as the stream end
+		writeLength(-1, socketStream);
+
+		// Receive blob key and compare
+		final InputStream is = this.socket.getInputStream();
+		return receiveAndCheckPutResponse(is, md, blobType);
+	}
+
+	/**
+	 * Constructs and writes the header data for a PUT request to the given output stream.
+	 *
+	 * @param outputStream
+	 * 		the output stream to write the PUT header data to
+	 * @param jobId
+	 * 		the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
+	 *
+	 * @throws IOException
+	 * 		thrown if an I/O error occurs while writing the header data to the output stream
+	 */
+	private static void sendPutHeader(
+		OutputStream outputStream, @Nullable JobID jobId, BlobKey.BlobType blobType)
+		throws IOException {
+		// Signal type of operation
+		outputStream.write(PUT_OPERATION);
+		if (jobId == null) {
+			outputStream.write(JOB_UNRELATED_CONTENT);
+		} else {
+			outputStream.write(JOB_RELATED_CONTENT);
+			outputStream.write(jobId.getBytes());
+		}
+		outputStream.write(blobType.ordinal());
+	}
+
+	/**
+	 * Reads the response from the input stream and throws in case of errors.
+	 *
+	 * @param is
+	 * 		stream to read from
+	 * @param md
+	 * 		message digest to check the response against
+	 * @param blobType
+	 * 		whether the BLOB should be permanent or transient
+	 *
+	 * @throws IOException
+	 * 		if the response is an error, the message digest does not match or reading the response
+	 * 		failed
+	 */
+	private static BlobKey receiveAndCheckPutResponse(
+		InputStream is, MessageDigest md, BlobKey.BlobType blobType)
+		throws IOException {
+		int response = is.read();
+		if (response < 0) {
+			throw new EOFException("Premature end of response");
+		}
+		else if (response == RETURN_OKAY) {
+
+			BlobKey remoteKey = BlobKey.readFromInputStream(is);
+			byte[] localHash = md.digest();
+
+			if (blobType != remoteKey.getType()) {
+				throw new IOException("Detected data corruption during transfer");
+			}
+			if (!Arrays.equals(localHash, remoteKey.getHash())) {
+				throw new IOException("Detected data corruption during transfer");
+			}
+
+			return remoteKey;
+		}
+		else if (response == RETURN_ERROR) {
+			Throwable cause = BlobUtils.readExceptionFromStream(is);
+			throw new IOException("Server side error: " + cause.getMessage(), cause);
+		}
+		else {
+			throw new IOException("Unrecognized response: " + response + '.');
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index a21c7d6..a61d679 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
@@ -353,6 +354,28 @@ public class BlobUtils {
 	}
 
 	/**
+	 * Reads exception from given {@link InputStream}.
+	 *
+	 * @param in the input stream to read from
+	 * @return exception that was read
+	 * @throws IOException thrown if an I/O error occurs while reading from the input
+	 *                     stream
+	 */
+	static Throwable readExceptionFromStream(InputStream in) throws IOException {
+		int len = readLength(in);
+		byte[] bytes = new byte[len];
+		readFully(in, bytes, 0, len, "Error message");
+
+		try {
+			return (Throwable) InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+		}
+		catch (ClassNotFoundException e) {
+			// should never occur
+			throw new IOException("Could not transfer error message", e);
+		}
+	}
+
+	/**
 	 * Auxiliary method to read a particular number of bytes from an input stream. This method blocks until the
 	 * requested number of bytes have been read from the stream. If the stream cannot offer enough data, an
 	 * {@link EOFException} is thrown.

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 4b84857..76d9bd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -429,6 +429,13 @@ public class JobClient {
 				"Could not upload the program's JAR files to the JobManager.", e);
 		}
 
+		try {
+			jobGraph.uploadUserArtifacts(blobServerAddress, config);
+		} catch (IOException e) {
+			throw new JobSubmissionException(jobGraph.getJobID(),
+					"Could not upload custom user artifacts to the job manager.", e);
+		}
+
 		CompletableFuture<Acknowledge> submissionFuture = jobManagerGateway.submitJob(jobGraph, ListeningBehaviour.DETACHED, timeout);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index e9824ae..9b95633 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -162,6 +162,16 @@ public class JobSubmissionClientActor extends JobClientActor {
 							"Could not upload the jar files to the job manager.",
 							e));
 				}
+
+				try {
+					jobGraph.uploadUserArtifacts(blobServerAddress, clientConfig);
+				} catch (IOException e) {
+					throw new CompletionException(
+						new JobSubmissionException(
+							jobGraph.getJobID(),
+							"Could not upload custom user artifacts to the job manager.",
+							e));
+				}
 			},
 			getContext().dispatcher());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index d9d021a..9021751 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -18,27 +18,33 @@
 
 package org.apache.flink.runtime.filecache;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
 
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -46,6 +52,10 @@ import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The FileCache is used to create the local files for the registered cache files when a task is deployed.
@@ -54,12 +64,14 @@ import java.util.concurrent.TimeUnit;
  */
 public class FileCache {
 
-	static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
 
 	/** cache-wide lock to ensure consistency. copies are not done under this lock. */
 	private final Object lock = new Object();
 
-	private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
+	private final Map<JobID, Map<String, Future<Path>>> entries;
+
+	private final Map<JobID, Set<ExecutionAttemptID>> jobRefHolders;
 
 	private final ScheduledExecutorService executorService;
 
@@ -69,11 +81,23 @@ public class FileCache {
 
 	private int nextDirectory;
 
+	private final PermanentBlobService blobService;
+
+	private final long cleanupInterval; //in milliseconds
+
 	// ------------------------------------------------------------------------
 
-	public FileCache(String[] tempDirectories) throws IOException {
+	public FileCache(String[] tempDirectories, PermanentBlobService blobService) throws IOException {
+		this (tempDirectories, blobService, Executors.newScheduledThreadPool(10,
+			new ExecutorThreadFactory("flink-file-cache")), 5000);
+	}
+
+	@VisibleForTesting
+	FileCache(String[] tempDirectories, PermanentBlobService blobService,
+		ScheduledExecutorService executorService, long cleanupInterval) throws IOException {
 
 		Preconditions.checkNotNull(tempDirectories);
+		this.cleanupInterval = cleanupInterval;
 
 		storageDirectories = new File[tempDirectories.length];
 
@@ -99,9 +123,10 @@ public class FileCache {
 
 		this.shutdownHook = createShutdownHook(this, LOG);
 
-		this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();
-		this.executorService = Executors.newScheduledThreadPool(10,
-				new ExecutorThreadFactory("flink-file-cache"));
+		this.entries = new HashMap<>();
+		this.jobRefHolders = new HashMap<>();
+		this.executorService = executorService;
+		this.blobService = blobService;
 	}
 
 	/**
@@ -114,7 +139,7 @@ public class FileCache {
 			if (es != null) {
 				es.shutdown();
 				try {
-					es.awaitTermination(5000L, TimeUnit.MILLISECONDS);
+					es.awaitTermination(cleanupInterval, TimeUnit.MILLISECONDS);
 				}
 				catch (InterruptedException e) {
 					// may happen
@@ -122,6 +147,7 @@ public class FileCache {
 			}
 
 			entries.clear();
+			jobRefHolders.clear();
 
 			// clean up the all storage directories
 			for (File dir : storageDirectories) {
@@ -143,30 +169,24 @@ public class FileCache {
 	/**
 	 * If the file doesn't exists locally, it will copy the file to the temp directory.
 	 *
-	 * @param name  The name under which the file is registered.
 	 * @param entry The cache entry descriptor (path, executable flag)
 	 * @param jobID The ID of the job for which the file is copied.
 	 * @return The handle to the task that copies the file.
 	 */
-	public Future<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
+	public Future<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID, ExecutionAttemptID executionId) throws Exception {
 		synchronized (lock) {
-			Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
-			if (jobEntries == null) {
-				jobEntries = new HashMap<String, Tuple4<Integer, File, Path, Future<Path>>>();
-				entries.put(jobID, jobEntries);
-			}
+			Map<String, Future<Path>> jobEntries = entries.computeIfAbsent(jobID, k -> new HashMap<>());
+
+			// register reference holder
+			final Set<ExecutionAttemptID> refHolders = jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
+			refHolders.add(executionId);
 
-			// tuple is (ref-count, parent-temp-dir, cached-file-path, copy-process)
-			Tuple4<Integer, File, Path, Future<Path>> fileEntry = jobEntries.get(name);
+			Future<Path> fileEntry = jobEntries.get(name);
 			if (fileEntry != null) {
 				// file is already in the cache. return a future that
 				// immediately returns the file
-				fileEntry.f0 = fileEntry.f0 + 1;
-
-				// return the future. may be that the copy is still in progress
-				return fileEntry.f3;
-			}
-			else {
+				return fileEntry;
+			} else {
 				// need to copy the file
 
 				// create the target path
@@ -184,75 +204,18 @@ public class FileCache {
 				Path target = new Path(tempDirToUse.getAbsolutePath() + "/" + sourceFile);
 
 				// kick off the copying
-				CopyProcess cp = new CopyProcess(entry, target);
-				FutureTask<Path> copyTask = new FutureTask<Path>(cp);
+				Callable<Path> cp = new CopyFromBlobProcess(entry, jobID, blobService, target);
+				FutureTask<Path> copyTask = new FutureTask<>(cp);
 				executorService.submit(copyTask);
 
 				// store our entry
-				jobEntries.put(name, new Tuple4<Integer, File, Path, Future<Path>>(1, tempDirToUse, target, copyTask));
+				jobEntries.put(name, copyTask);
 
 				return copyTask;
 			}
 		}
 	}
 
-	/**
-	 * Deletes the local file after a 5 second delay.
-	 *
-	 * @param name  The name under which the file is registered.
-	 * @param jobID The ID of the job for which the file is copied.
-	 */
-	public void deleteTmpFile(String name, JobID jobID) {
-		DeleteProcess dp = new DeleteProcess(lock, entries, name, jobID);
-		executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
-	}
-
-	boolean holdsStillReference(String name, JobID jobId) {
-		Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobId);
-		if (jobEntries != null) {
-			Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
-			return entry != null && entry.f0 > 0;
-		}
-		else {
-			return false;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
-		// TODO rewrite this to make it participate in the closable registry and the lifecycle of a task.
-		// we unwrap the file system to get raw streams without safety net
-		FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri());
-		FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri());
-		if (!tFS.exists(targetPath)) {
-			if (sFS.getFileStatus(sourcePath).isDir()) {
-				tFS.mkdirs(targetPath);
-				FileStatus[] contents = sFS.listStatus(sourcePath);
-				for (FileStatus content : contents) {
-					String distPath = content.getPath().toString();
-					if (content.isDir()) {
-						if (distPath.endsWith("/")) {
-							distPath = distPath.substring(0, distPath.length() - 1);
-						}
-					}
-					String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
-					copy(content.getPath(), new Path(localPath), executable);
-				}
-			} else {
-				try (FSDataOutputStream lfsOutput = tFS.create(targetPath, FileSystem.WriteMode.NO_OVERWRITE); FSDataInputStream fsInput = sFS.open(sourcePath)) {
-					IOUtils.copyBytes(fsInput, lfsOutput);
-					//noinspection ResultOfMethodCallIgnored
-					new File(targetPath.toString()).setExecutable(executable);
-				} catch (IOException ioe) {
-					LOG.error("could not copy file to local file cache.", ioe);
-				}
-			}
-		}
-	}
-
 	private static Thread createShutdownHook(final FileCache cache, final Logger logger) {
 
 		return ShutdownHookUtil.addShutdownHook(
@@ -262,50 +225,92 @@ public class FileCache {
 		);
 	}
 
+	public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+		checkNotNull(jobId);
+
+		synchronized (lock) {
+			Set<ExecutionAttemptID> jobRefCounter = jobRefHolders.get(jobId);
+
+			if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+				LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId);
+				return;
+			}
+
+			jobRefCounter.remove(executionId);
+			if (jobRefCounter.isEmpty()) {
+				executorService.schedule(new DeleteProcess(jobId), cleanupInterval, TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  background processes
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Asynchronous file copy process.
+	 * Asynchronous file copy process from blob server.
 	 */
-	private static class CopyProcess implements Callable<Path> {
-
-		private final Path filePath;
-		private final Path cachedPath;
-		private boolean executable;
+	private static class CopyFromBlobProcess implements Callable<Path> {
 
-		public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-			this.filePath = new Path(e.filePath);
-			this.executable = e.isExecutable;
-			this.cachedPath = cachedPath;
+		private final PermanentBlobKey blobKey;
+		private final Path target;
+		private final boolean isDirectory;
+		private final boolean isExecutable;
+		private final JobID jobID;
+		private final PermanentBlobService blobService;
+
+		CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) throws Exception {
+				this.isExecutable = e.isExecutable;
+				this.isDirectory = e.isZipped;
+				this.jobID = jobID;
+				this.blobService = blobService;
+				this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader());
+				this.target = target;
 		}
 
 		@Override
 		public Path call() throws IOException {
-			// let exceptions propagate. we can retrieve them later from
-			// the future and report them upon access to the result
-			copy(filePath, cachedPath, this.executable);
-			return cachedPath;
+			final File file = blobService.getFile(jobID, blobKey);
+
+			if (isDirectory) {
+				try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
+					ZipEntry entry;
+					while ((entry = zis.getNextEntry()) != null) {
+						String fileName = entry.getName();
+						Path newFile = new Path(target, fileName);
+						if (entry.isDirectory()) {
+							target.getFileSystem().mkdirs(newFile);
+						} else {
+							try (FSDataOutputStream fsDataOutputStream = target.getFileSystem()
+									.create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
+								IOUtils.copyBytes(zis, fsDataOutputStream, false);
+							}
+							//noinspection ResultOfMethodCallIgnored
+							new File(newFile.getPath()).setExecutable(isExecutable);
+						}
+						zis.closeEntry();
+					}
+				}
+				Files.delete(file.toPath());
+				return target;
+			} else {
+				//noinspection ResultOfMethodCallIgnored
+				file.setExecutable(isExecutable);
+				return Path.fromLocalFile(file);
+			}
+
 		}
 	}
 
 	/**
 	 * If no task is using this file after 5 seconds, clear it.
 	 */
-	private static class DeleteProcess implements Runnable {
-
-		private final Object lock;
-		private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
+	@VisibleForTesting
+	class DeleteProcess implements Runnable {
 
-		private final String name;
 		private final JobID jobID;
 
-		public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
-								String name, JobID jobID) {
-			this.lock = lock;
-			this.entries = entries;
-			this.name = name;
+		DeleteProcess(JobID jobID) {
 			this.jobID = jobID;
 		}
 
@@ -313,53 +318,26 @@ public class FileCache {
 		public void run() {
 			try {
 				synchronized (lock) {
-					Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
 
-					if (jobEntries != null) {
-						Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
+					Set<ExecutionAttemptID> jobRefs = jobRefHolders.get(jobID);
+					if (jobRefs != null && jobRefs.isEmpty()) {
+						// abort the copy
+						for (Future<Path> fileFuture : entries.get(jobID).values()) {
+							fileFuture.cancel(true);
+						}
 
-						if (entry != null) {
-							int count = entry.f0;
-							if (count > 1) {
-								// multiple references still
-								entry.f0 = count - 1;
-							}
-							else {
-								// we remove the last reference
-								jobEntries.remove(name);
-								if (jobEntries.isEmpty()) {
-									entries.remove(jobID);
-								}
-
-								// abort the copy
-								entry.f3.cancel(true);
-
-								// remove the file
-								File file = new File(entry.f2.toString());
-								if (file.exists()) {
-									if (file.isDirectory()) {
-										FileUtils.deleteDirectory(file);
-									}
-									else if (!file.delete()) {
-										LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
-									}
-								}
-
-								// remove the job wide temp directory, if it is now empty
-								File parent = entry.f1;
-								if (parent.isDirectory()) {
-									String[] children = parent.list();
-									if (children == null || children.length == 0) {
-										//noinspection ResultOfMethodCallIgnored
-										parent.delete();
-									}
-								}
-							}
+						//remove job specific entries in maps
+						entries.remove(jobID);
+						jobRefHolders.remove(jobID);
+
+						// remove the job wide temp directories
+						for (File storageDirectory : storageDirectories) {
+							File tempDir = new File(storageDirectory, jobID.toString());
+							FileUtils.deleteDirectory(tempDir);
 						}
 					}
 				}
-			}
-			catch (IOException e) {
+			} catch (IOException e) {
 				LOG.error("Could not delete file from local file cache.", e);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 889f8ae..bc1a795 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.jobgraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -34,6 +36,7 @@ import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -98,6 +101,9 @@ public class JobGraph implements Serializable {
 	/** Set of JAR files required to run this job. */
 	private final List<Path> userJars = new ArrayList<Path>();
 
+	/** Set of custom files required to run this job. */
+	private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();
+
 	/** Set of blob keys identifying the JAR files required to run this job. */
 	private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();
 
@@ -267,7 +273,7 @@ public class JobGraph implements Serializable {
 	 * Sets the execution config. This method eagerly serialized the ExecutionConfig for future RPC
 	 * transport. Further modification of the referenced ExecutionConfig object will not affect
 	 * this serialized copy.
-	 * 
+	 *
 	 * @param executionConfig The ExecutionConfig to be serialized.
 	 * @throws IOException Thrown if the serialization of the ExecutionConfig fails
 	 */
@@ -489,12 +495,35 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
+	 * Adds the path of a custom file required to run the job on a task manager.
+	 *
+	 * @param name a name under which this artifact will be accessible through {@link DistributedCache}
+	 * @param file path of a custom file required to run the job on a task manager
+	 */
+	public void addUserArtifact(String name, DistributedCache.DistributedCacheEntry file) {
+		if (file == null) {
+			throw new IllegalArgumentException();
+		}
+
+		userArtifacts.putIfAbsent(name, file);
+	}
+
+	/**
+	 * Gets the list of assigned user jar paths.
+	 *
+	 * @return The list of assigned user jar paths
+	 */
+	public Map<String, DistributedCache.DistributedCacheEntry> getUserArtifacts() {
+		return userArtifacts;
+	}
+
+	/**
 	 * Adds the BLOB referenced by the key to the JobGraph's dependencies.
 	 *
 	 * @param key
 	 *        path of the JAR file required to run the job on a task manager
 	 */
-	public void addBlob(PermanentBlobKey key) {
+	public void addUserJarBlobKey(PermanentBlobKey key) {
 		if (key == null) {
 			throw new IllegalArgumentException();
 		}
@@ -535,7 +564,7 @@ public class JobGraph implements Serializable {
 			InetSocketAddress blobServerAddress,
 			Configuration blobClientConfig) throws IOException {
 		if (!userJars.isEmpty()) {
-			List<PermanentBlobKey> blobKeys = BlobClient.uploadJarFiles(
+			List<PermanentBlobKey> blobKeys = BlobClient.uploadFiles(
 				blobServerAddress, blobClientConfig, jobID, userJars);
 
 			for (PermanentBlobKey blobKey : blobKeys) {
@@ -550,4 +579,24 @@ public class JobGraph implements Serializable {
 	public String toString() {
 		return "JobGraph(jobId: " + jobID + ")";
 	}
+
+	public void uploadUserArtifacts(InetSocketAddress blobServerAddress, Configuration clientConfig) throws IOException {
+		if (!userArtifacts.isEmpty()) {
+			try (BlobClient blobClient = new BlobClient(blobServerAddress, clientConfig)) {
+				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) {
+
+					final PermanentBlobKey key = blobClient.uploadFile(jobID,
+						new Path(userArtifact.getValue().filePath));
+
+					DistributedCache.writeFileInfoToConfig(
+						userArtifact.getKey(),
+						new DistributedCache.DistributedCacheEntry(
+							userArtifact.getValue().filePath,
+							userArtifact.getValue().isExecutable,
+							InstantiationUtil.serializeObject(key)),
+						jobConfiguration);
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 64d46c6..380ac0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -579,6 +579,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job, "job is null");
 
+		uploadUserArtifacts(job);
+
 		final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
 
 		try {
@@ -602,6 +604,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job, "job is null");
 
+		uploadUserArtifacts(job);
 		final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
 
 		final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
@@ -624,6 +627,15 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		}
 	}
 
+	private void uploadUserArtifacts(JobGraph job) throws JobExecutionException {
+		try {
+			final InetSocketAddress blobAddress = new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort());
+			job.uploadUserArtifacts(blobAddress, miniClusterConfiguration.getConfiguration());
+		} catch (IOException e) {
+			throw new JobExecutionException(job.getJobID(), "Could not upload user artifacts", e);
+		}
+	}
+
 	public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
 		final DispatcherGateway dispatcherGateway;
 		try {
@@ -675,7 +687,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 			CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars());
 			return jarUploadFuture.thenAccept(blobKeys -> {
 					for (PermanentBlobKey blobKey : blobKeys) {
-						job.addBlob(blobKey);
+						job.addUserJarBlobKey(blobKey);
 					}
 				});
 		} else {
@@ -690,7 +702,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 				InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
 
 				try {
-					return BlobClient.uploadJarFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
+					return BlobClient.uploadFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
 				} catch (IOException ioe) {
 					throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
 				}