You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/21 15:30:09 UTC

[2/7] flink git commit: [FLINK-9623][runtime] Move zipping logic out of blobservice

[FLINK-9623][runtime] Move zipping logic out of blobservice

This closes #6187.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f07ecc6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f07ecc6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f07ecc6

Branch: refs/heads/master
Commit: 3f07ecc652c07101ae1b1a8fb17f89a2d1206e1e
Parents: 137231a
Author: zentol <ch...@apache.org>
Authored: Mon Jun 4 13:50:44 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 21 14:26:09 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/common/Plan.java  | 20 +----
 .../api/common/cache/DistributedCache.java      | 31 +++++++-
 .../java/org/apache/flink/util/FileUtils.java   | 59 +++++++++++++++
 .../org/apache/flink/util/FileUtilsTest.java    | 78 +++++++++++++++++++-
 .../plantranslate/JobGraphGenerator.java        | 54 ++++++++++++--
 .../plantranslate/JobGraphGeneratorTest.java    | 69 ++++++++++++++++-
 .../apache/flink/runtime/blob/BlobClient.java   | 42 +----------
 .../flink/runtime/filecache/FileCache.java      | 63 ++++++----------
 .../apache/flink/runtime/jobgraph/JobGraph.java |  3 +-
 .../flink/runtime/blob/BlobClientTest.java      | 46 ------------
 .../filecache/FileCacheDirectoriesTest.java     | 19 ++---
 .../api/graph/StreamingJobGraphGenerator.java   |  7 +-
 .../streaming/util/TestStreamEnvironment.java   |  6 --
 13 files changed, 313 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index db10ce4..efbc4fa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -26,17 +26,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.io.File;
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Visitable;
 import org.apache.flink.util.Visitor;
 
@@ -342,20 +337,7 @@ public class Plan implements Visitable<Operator<?>> {
 	 */
 	public void registerCachedFile(String name, DistributedCacheEntry entry) throws IOException {
 		if (!this.cacheFile.containsKey(name)) {
-			try {
-				URI u = new URI(entry.filePath);
-				if (!u.getPath().startsWith("/")) {
-					u = new File(entry.filePath).toURI();
-				}
-				FileSystem fs = FileSystem.get(u);
-				if (fs.exists(new Path(u.getPath()))) {
-					this.cacheFile.put(name, new DistributedCacheEntry(u.toString(), entry.isExecutable));
-				} else {
-					throw new IOException("File " + u.toString() + " doesn't exist.");
-				}
-			} catch (URISyntaxException ex) {
-				throw new IOException("Invalid path: " + entry.filePath, ex);
-			}
+			this.cacheFile.put(name, entry);
 		} else {
 			throw new IOException("cache file " + name + "already exists!");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
index 4bb4ece..2ad98fb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path;
 import java.io.File;
 import java.io.Serializable;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -42,6 +43,10 @@ public class DistributedCache {
 
 	/**
 	 * Meta info about an entry in {@link DistributedCache}.
+	 *
+	 * <p>Entries have different semantics for local directories depending on where we are in the job-submission process.
+	 * After registration through the API {@code filePath} denotes the original directory.
+	 * After the upload to the cluster (which includes zipping the directory), {@code filePath} denotes the (server-side) copy of the zip.
 	 */
 	public static class DistributedCacheEntry implements Serializable {
 
@@ -51,6 +56,17 @@ public class DistributedCache {
 
 		public byte[] blobKey;
 
+		/** Client-side constructor used by the API for initial registration. */
+		public DistributedCacheEntry(String filePath, Boolean isExecutable) {
+			this(filePath, isExecutable, null);
+		}
+
+		/** Client-side constructor used during job-submission for zipped directory. */
+		public DistributedCacheEntry(String filePath, boolean isExecutable, boolean isZipped) {
+			this(filePath, isExecutable, null, isZipped);
+		}
+
+		/** Server-side constructor used during job-submission for zipped directories. */
 		public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobKey, boolean isZipped) {
 			this.filePath = filePath;
 			this.isExecutable = isExecutable;
@@ -58,13 +74,20 @@ public class DistributedCache {
 			this.isZipped = isZipped;
 		}
 
-		public DistributedCacheEntry(String filePath, Boolean isExecutable){
-			this(filePath, isExecutable, null);
-		}
-
+		/** Server-side constructor used during job-submission for files. */
 		public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobKey){
 			this(filePath, isExecutable, blobKey, false);
 		}
+
+		@Override
+		public String toString() {
+			return "DistributedCacheEntry{" +
+				"filePath='" + filePath + '\'' +
+				", isExecutable=" + isExecutable +
+				", isZipped=" + isZipped +
+				", blobKey=" + Arrays.toString(blobKey) +
+				'}';
+		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 46c5621..713ed61 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -32,6 +32,9 @@ import java.nio.file.AccessDeniedException;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.util.Random;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -353,6 +356,62 @@ public final class FileUtils {
 		}
 	}
 
+	public static Path compressDirectory(Path directory, Path target) throws IOException {
+		FileSystem sourceFs = directory.getFileSystem();
+		FileSystem targetFs = target.getFileSystem();
+
+		try (ZipOutputStream out = new ZipOutputStream(targetFs.create(target, FileSystem.WriteMode.NO_OVERWRITE))) {
+			addToZip(directory, sourceFs, directory.getParent(), out);
+		}
+		return target;
+	}
+
+	private static void addToZip(Path fileOrDirectory, FileSystem fs, Path rootDir, ZipOutputStream out) throws IOException {
+		String relativePath = fileOrDirectory.getPath().replace(rootDir.getPath() + '/', "");
+		if (fs.getFileStatus(fileOrDirectory).isDir()) {
+			out.putNextEntry(new ZipEntry(relativePath + '/'));
+			for (FileStatus containedFile : fs.listStatus(fileOrDirectory)) {
+				addToZip(containedFile.getPath(), fs, rootDir, out);
+			}
+		} else {
+			ZipEntry entry = new ZipEntry(relativePath);
+			out.putNextEntry(entry);
+
+			try (FSDataInputStream in = fs.open(fileOrDirectory)) {
+				IOUtils.copyBytes(in, out, false);
+			}
+			out.closeEntry();
+		}
+	}
+
+	public static Path expandDirectory(Path file, Path targetDirectory) throws IOException {
+		FileSystem sourceFs = file.getFileSystem();
+		FileSystem targetFs = targetDirectory.getFileSystem();
+		Path rootDir = null;
+		try (ZipInputStream zis = new ZipInputStream(sourceFs.open(file))) {
+			ZipEntry entry;
+			while ((entry = zis.getNextEntry()) != null) {
+				Path relativePath = new Path(entry.getName());
+				if (rootDir == null) {
+					// the first entry contains the name of the original directory that was zipped
+					rootDir = relativePath;
+				}
+
+				Path newFile = new Path(targetDirectory, relativePath);
+				if (entry.isDirectory()) {
+					targetFs.mkdirs(newFile);
+				} else {
+					try (FSDataOutputStream fileStream = targetFs.create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
+						// do not close the streams here as it prevents access to further zip entries
+						IOUtils.copyBytes(zis, fileStream, false);
+					}
+				}
+				zis.closeEntry();
+			}
+		}
+		return new Path(targetDirectory, rootDir);
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index 4cd4814..b42ee22 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -27,11 +27,20 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.AccessDeniedException;
-
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -166,10 +175,77 @@ public class FileUtilsTest {
 		assertFalse(parent.exists());
 	}
 
+	@Test
+	public void testCompression() throws IOException {
+		final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+			+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+			+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
+			+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
+			+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
+			+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
+			+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
+			+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
+			+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
+			+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+			+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
+			+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
+			+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
+			+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+		final java.nio.file.Path compressDir = tmp.newFolder("compressDir").toPath();
+		final java.nio.file.Path extractDir = tmp.newFolder("extractDir").toPath();
+
+		final java.nio.file.Path originalDir = Paths.get("rootDir");
+		final java.nio.file.Path emptySubDir = originalDir.resolve("emptyDir");
+		final java.nio.file.Path fullSubDir = originalDir.resolve("fullDir");
+		final java.nio.file.Path file1 = originalDir.resolve("file1");
+		final java.nio.file.Path file2 = originalDir.resolve("file2");
+		final java.nio.file.Path file3 = fullSubDir.resolve("file3");
+
+		Files.createDirectory(compressDir.resolve(originalDir));
+		Files.createDirectory(compressDir.resolve(emptySubDir));
+		Files.createDirectory(compressDir.resolve(fullSubDir));
+		Files.copy(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), compressDir.resolve(file1));
+		Files.createFile(compressDir.resolve(file2));
+		Files.copy(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), compressDir.resolve(file3));
+
+		final Path zip = FileUtils.compressDirectory(
+			new Path(compressDir.resolve(originalDir).toString()),
+			new Path(compressDir.resolve(originalDir) + ".zip"));
+
+		FileUtils.expandDirectory(zip, new Path(extractDir.toAbsolutePath().toString()));
+
+		assertDirEquals(compressDir.resolve(originalDir), extractDir.resolve(originalDir));
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	private static void assertDirEquals(java.nio.file.Path expected, java.nio.file.Path actual) throws IOException {
+		assertEquals(Files.isDirectory(expected), Files.isDirectory(actual));
+		assertEquals(expected.getFileName(), actual.getFileName());
+
+		if (Files.isDirectory(expected)) {
+			List<java.nio.file.Path> expectedContents = Files.list(expected)
+				.sorted(Comparator.comparing(java.nio.file.Path::toString))
+				.collect(Collectors.toList());
+			List<java.nio.file.Path> actualContents = Files.list(actual)
+				.sorted(Comparator.comparing(java.nio.file.Path::toString))
+				.collect(Collectors.toList());
+
+			assertEquals(expectedContents.size(), actualContents.size());
+
+			for (int x = 0; x < expectedContents.size(); x++) {
+				assertDirEquals(expectedContents.get(x), actualContents.get(x));
+			}
+		} else {
+			byte[] expectedBytes = Files.readAllBytes(expected);
+			byte[] actualBytes = Files.readAllBytes(actual);
+			assertArrayEquals(expectedBytes, actualBytes);
+		}
+	}
+
 	private static void generateRandomDirs(File dir, int numFiles, int numDirs, int depth) throws IOException {
 		// generate the random files
 		for (int i = 0; i < numFiles; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 0c0c3f3..18259d3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -23,14 +23,17 @@ import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.AlgorithmOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -78,12 +81,18 @@ import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.Visitor;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -91,7 +100,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 /**
  * This component translates the optimizer's resulting {@link org.apache.flink.optimizer.plan.OptimizedPlan}
@@ -106,6 +115,8 @@ import java.util.Map.Entry;
  */
 public class JobGraphGenerator implements Visitor<PlanNode> {
 	
+	private static final Logger LOG = LoggerFactory.getLogger(JobGraphGenerator.class);
+	
 	public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux";
 	
 	private static final boolean mergeIterationAuxTasks =
@@ -243,11 +254,13 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			vertex.setSlotSharingGroup(sharingGroup);
 		}
 
-		// add registered cache file into job configuration
-		for (Entry<String, DistributedCacheEntry> e : program.getOriginalPlan().getCachedFiles()) {
-			graph.addUserArtifact(e.getKey(), e.getValue());
-		}
 
+		Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts =
+			program.getOriginalPlan().getCachedFiles().stream()
+			.map(entry -> Tuple2.of(entry.getKey(), entry.getValue()))
+			.collect(Collectors.toList());
+		addUserArtifactEntries(userArtifacts, graph);
+		
 		// release all references again
 		this.vertices = null;
 		this.chainedTasks = null;
@@ -260,6 +273,35 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		return graph;
 	}
 
+	public static void addUserArtifactEntries(Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts, JobGraph jobGraph) {
+		if (!userArtifacts.isEmpty()) {
+			try {
+				java.nio.file.Path tmpDir = Files.createTempDirectory("flink-distributed-cache-" + jobGraph.getJobID());
+				for (Tuple2<String, DistributedCache.DistributedCacheEntry> originalEntry : userArtifacts) {
+					Path filePath = new Path(originalEntry.f1.filePath);
+					boolean isLocalDir = false;
+					try {
+						FileSystem sourceFs = filePath.getFileSystem();
+						isLocalDir = !sourceFs.isDistributedFS() && sourceFs.getFileStatus(filePath).isDir();
+					} catch (IOException ioe) {
+						LOG.warn("Could not determine whether {} denotes a local path.", filePath, ioe);
+					}
+					// zip local directories because we only support file uploads
+					DistributedCache.DistributedCacheEntry entry;
+					if (isLocalDir) {
+						Path zip = FileUtils.compressDirectory(filePath, new Path(tmpDir.toString(), filePath.getName() + ".zip"));
+						entry = new DistributedCache.DistributedCacheEntry(zip.toString(), originalEntry.f1.isExecutable, true);
+					} else {
+						entry = new DistributedCache.DistributedCacheEntry(filePath.toString(), originalEntry.f1.isExecutable, false);
+					}
+					jobGraph.addUserArtifact(originalEntry.f0, entry);
+				}
+			} catch (IOException ioe) {
+				throw new FlinkRuntimeException("Could not compress distributed-cache artifacts.", ioe);
+			}
+		}
+	}
+
 	/**
 	 * This methods implements the pre-visiting during a depth-first traversal. It create the job vertex and
 	 * sets local strategy.

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
index 51c6a85..d58dc47 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer.plantranslate;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -36,14 +37,29 @@ import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
 import java.lang.reflect.Method;
-
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class JobGraphGeneratorTest {
 
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
 	/**
 	 * Verifies that the resources are merged correctly for chained operators when
 	 * generating job graph
@@ -206,4 +222,55 @@ public class JobGraphGeneratorTest {
 		assertTrue(sinkVertex.getPreferredResources().equals(resource6));
 		assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
 	}
+
+	@Test
+	public void testArtifactCompression() throws IOException {
+		Path plainFile1 = tmp.newFile("plainFile1").toPath();
+		Path plainFile2 = tmp.newFile("plainFile2").toPath();
+
+		Path directory1 = tmp.newFolder("directory1").toPath();
+		Files.createDirectory(directory1.resolve("containedFile1"));
+
+		Path directory2 = tmp.newFolder("directory2").toPath();
+		Files.createDirectory(directory2.resolve("containedFile2"));
+
+		JobGraph jb = new JobGraph();
+
+		final String executableFileName = "executableFile";
+		final String nonExecutableFileName = "nonExecutableFile";
+		final String executableDirName = "executableDir";
+		final String nonExecutableDirName = "nonExecutableDIr";
+
+		Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
+			Tuple2.of(executableFileName, new DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
+			Tuple2.of(nonExecutableFileName, new DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
+			Tuple2.of(executableDirName, new DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
+			Tuple2.of(nonExecutableDirName, new DistributedCache.DistributedCacheEntry(directory2.toString(), false))
+		);
+
+		JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
+
+		Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts = jb.getUserArtifacts();
+
+		DistributedCache.DistributedCacheEntry executableFileEntry = submittedArtifacts.get(executableFileName);
+		assertState(executableFileEntry, true, false);
+
+		DistributedCache.DistributedCacheEntry nonExecutableFileEntry = submittedArtifacts.get(nonExecutableFileName);
+		assertState(nonExecutableFileEntry, false, false);
+
+		DistributedCache.DistributedCacheEntry executableDirEntry = submittedArtifacts.get(executableDirName);
+		assertState(executableDirEntry, true, true);
+
+		DistributedCache.DistributedCacheEntry nonExecutableDirEntry = submittedArtifacts.get(nonExecutableDirName);
+		assertState(nonExecutableDirEntry, false, true);
+	}
+
+	private static void assertState(DistributedCache.DistributedCacheEntry entry, boolean isExecutable, boolean isZipped) throws IOException {
+		assertNotNull(entry);
+		assertEquals(isExecutable, entry.isExecutable);
+		assertEquals(isZipped, entry.isZipped);
+		org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path(entry.filePath);
+		assertTrue(filePath.getFileSystem().exists(filePath));
+		assertFalse(filePath.getFileSystem().getFileStatus(filePath).isDir());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 1301740..80e36b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -49,8 +47,6 @@ import java.net.Socket;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
@@ -444,42 +440,8 @@ public final class BlobClient implements Closeable {
 	 */
 	public PermanentBlobKey uploadFile(JobID jobId, Path file) throws IOException {
 		final FileSystem fs = file.getFileSystem();
-		if (fs.getFileStatus(file).isDir()) {
-			return uploadDirectory(jobId, file, fs);
-		} else {
-			try (InputStream is = fs.open(file)) {
-				return (PermanentBlobKey) putInputStream(jobId, is, PERMANENT_BLOB);
-			}
-		}
-	}
-
-	private PermanentBlobKey uploadDirectory(JobID jobId, Path file, FileSystem fs) throws IOException {
-		try (BlobOutputStream blobOutputStream = new BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
-			try (ZipOutputStream zipStream = new ZipOutputStream(blobOutputStream)) {
-				compressDirectoryToZipfile(fs, fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
-				zipStream.finish();
-				return (PermanentBlobKey) blobOutputStream.finish();
-			}
-		}
-	}
-
-	private static void compressDirectoryToZipfile(FileSystem fs, FileStatus rootDir, FileStatus sourceDir, ZipOutputStream out) throws IOException {
-		for (FileStatus file : fs.listStatus(sourceDir.getPath())) {
-			LOG.info("Zipping file: {}", file);
-			if (file.isDir()) {
-				compressDirectoryToZipfile(fs, rootDir, file, out);
-			} else {
-				String entryName = file.getPath().getPath().replace(rootDir.getPath().getPath(), "");
-				LOG.info("Zipping entry: {}, file: {}, rootDir: {}", entryName, file, rootDir);
-				ZipEntry entry = new ZipEntry(entryName);
-				out.putNextEntry(entry);
-
-				try (FSDataInputStream in = fs.open(file.getPath())) {
-					IOUtils.copyBytes(in, out, false);
-				}
-				out.closeEntry();
-			}
+		try (InputStream is = fs.open(file)) {
+			return (PermanentBlobKey) putInputStream(jobId, is, PERMANENT_BLOB);
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index fe6c6bc..0019acf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -21,15 +21,12 @@ package org.apache.flink.runtime.filecache;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -38,9 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -52,15 +47,17 @@ import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The FileCache is used to create the local files for the registered cache files when a task is deployed.
- * The files will be removed when the task is unregistered after a 5 second delay.
- * A given file x will be placed in "{@code <system-tmp-dir>/tmp_<jobID>/}".
+ * The FileCache is used to access registered cache files when a task is deployed.
+ *
+ * <p>Files and zipped directories are retrieved from the {@link PermanentBlobService}. The life-cycle of these files
+ * is managed by the blob-service.
+ *
+ * <p>Retrieved directories will be expanded in "{@code <system-tmp-dir>/tmp_<jobID>/}"
+ * and deleted when the task is unregistered after a 5 second delay, unless a new task requests the file in the meantime.
  */
 public class FileCache {
 
@@ -167,7 +164,7 @@ public class FileCache {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * If the file doesn't exists locally, it will copy the file to the temp directory.
+	 * If the file doesn't exists locally, retrieve the file from the blob-service.
 	 *
 	 * @param entry The cache entry descriptor (path, executable flag)
 	 * @param jobID The ID of the job for which the file is copied.
@@ -195,20 +192,12 @@ public class FileCache {
 					nextDirectory = 0;
 				}
 
-				String sourceFile = entry.filePath;
-				int posOfSep = sourceFile.lastIndexOf("/");
-				if (posOfSep > 0) {
-					sourceFile = sourceFile.substring(posOfSep + 1);
-				}
-
-				Path target = new Path(tempDirToUse.getAbsolutePath() + "/" + sourceFile);
-
 				// kick off the copying
 				Callable<Path> cp;
 				if (entry.blobKey != null) {
-					cp = new CopyFromBlobProcess(entry, jobID, blobService, target);
+					cp = new CopyFromBlobProcess(entry, jobID, blobService, new Path(tempDirToUse.getAbsolutePath()));
 				} else {
-					cp = new CopyFromDFSProcess(entry, target);
+					cp = new CopyFromDFSProcess(entry, new Path(tempDirToUse.getAbsolutePath()));
 				}
 				FutureTask<Path> copyTask = new FutureTask<>(cp);
 				executorService.submit(copyTask);
@@ -278,26 +267,8 @@ public class FileCache {
 			final File file = blobService.getFile(jobID, blobKey);
 
 			if (isDirectory) {
-				try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
-					ZipEntry entry;
-					while ((entry = zis.getNextEntry()) != null) {
-						String fileName = entry.getName();
-						Path newFile = new Path(target, fileName);
-						if (entry.isDirectory()) {
-							target.getFileSystem().mkdirs(newFile);
-						} else {
-							try (FSDataOutputStream fsDataOutputStream = target.getFileSystem()
-									.create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
-								IOUtils.copyBytes(zis, fsDataOutputStream, false);
-							}
-							//noinspection ResultOfMethodCallIgnored
-							new File(newFile.getPath()).setExecutable(isExecutable);
-						}
-						zis.closeEntry();
-					}
-				}
-				Files.delete(file.toPath());
-				return target;
+				Path directory = FileUtils.expandDirectory(new Path(file.getAbsolutePath()), target);
+				return directory;
 			} else {
 				//noinspection ResultOfMethodCallIgnored
 				file.setExecutable(isExecutable);
@@ -319,7 +290,15 @@ public class FileCache {
 		public CopyFromDFSProcess(DistributedCacheEntry e, Path cachedPath) {
 			this.filePath = new Path(e.filePath);
 			this.executable = e.isExecutable;
-			this.cachedPath = cachedPath;
+
+			String sourceFile = e.filePath;
+			int posOfSep = sourceFile.lastIndexOf("/");
+			if (posOfSep > 0) {
+				sourceFile = sourceFile.substring(posOfSep + 1);
+			}
+
+			this.cachedPath = new Path(cachedPath, sourceFile);
+
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 061cabe..7231383 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -637,7 +637,8 @@ public class JobGraph implements Serializable {
 						new DistributedCache.DistributedCacheEntry(
 							userArtifact.getValue().filePath,
 							userArtifact.getValue().isExecutable,
-							InstantiationUtil.serializeObject(key)),
+							InstantiationUtil.serializeObject(key),
+							userArtifact.getValue().isZipped),
 						jobConfiguration);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 7107e24..c083d08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -34,7 +32,6 @@ import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
 
-import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
@@ -42,15 +39,10 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
 
 import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
@@ -468,44 +460,6 @@ public class BlobClientTest extends TestLogger {
 		uploadJarFile(getBlobServer(), getBlobClientConfig());
 	}
 
-	@Test
-	public void testDirectoryUploading() throws IOException {
-		final File newFolder = temporaryFolder.newFolder();
-		final File file1 = File.createTempFile("pre", "suff", newFolder);
-		FileUtils.writeFileUtf8(file1, "Test content");
-		final File file2 = File.createTempFile("pre", "suff", newFolder);
-		FileUtils.writeFileUtf8(file2, "Test content 2");
-
-		final Map<String, File> files = new HashMap<>();
-		files.put(file1.getName(), file1);
-		files.put(file2.getName(), file2);
-
-		BlobKey key;
-		final JobID jobId = new JobID();
-		final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
-		try (
-			BlobClient client = new BlobClient(
-				inetAddress, getBlobClientConfig())) {
-
-			key = client.uploadFile(jobId, new Path(newFolder.getPath()));
-		}
-
-		final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key);
-
-		try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
-			ZipEntry entry;
-			while ((entry = zis.getNextEntry()) != null) {
-				String fileName = entry.getName().replaceFirst("/", "");
-				final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-				IOUtils.copyBytes(zis, outputStream, false);
-
-				assertEquals(FileUtils.readFileUtf8(files.get(fileName)),
-					new String(outputStream.toByteArray(), StandardCharsets.UTF_8));
-				zis.closeEntry();
-			}
-		}
-	}
-
 	/**
 	 * Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)}} helper.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
index 2c716cc..3754f46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.After;
@@ -39,14 +38,12 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -84,15 +81,11 @@ public class FileCacheDirectoriesTest {
 		@Override
 		public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
 			if (key.equals(permanentBlobKey)) {
-				final File zipArchive = temporaryFolder.newFile("zipArchive");
-				try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) {
-
-					final ZipEntry zipEntry = new ZipEntry("cacheFile");
-					zis.putNextEntry(zipEntry);
-
-					IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false);
-				}
-				return zipArchive;
+				final java.nio.file.Path directory = temporaryFolder.newFolder("zipArchive").toPath();
+				final java.nio.file.Path containedFile = directory.resolve("cacheFile");
+				Files.copy(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), containedFile);
+				Path zipPath = FileUtils.compressDirectory(new Path(directory.toString()), new Path(directory + ".zip"));
+				return new File(zipPath.getPath());
 			} else {
 				throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a0695ec..5b8254d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -27,6 +26,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -156,10 +156,7 @@ public class StreamingJobGraphGenerator {
 
 		configureCheckpointing();
 
-		// add registered cache file into job configuration
-		for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
-			jobGraph.addUserArtifact(e.f0, e.f1);
-		}
+		JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);
 
 		// set the ExecutionConfig last when it has been finalized
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 99c7acb..c2e8814 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -19,8 +19,6 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.JobExecutor;
@@ -78,10 +76,6 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 
 		jobGraph.setClasspaths(new ArrayList<>(classPaths));
 
-		for (Tuple2<String, DistributedCache.DistributedCacheEntry> file : cacheFile) {
-			jobGraph.addUserArtifact(file.f0, file.f1);
-		}
-
 		return jobExecutor.executeJobBlocking(jobGraph);
 	}