You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/21 15:30:09 UTC
[2/7] flink git commit: [FLINK-9623][runtime] Move zipping logic out
of blobservice
[FLINK-9623][runtime] Move zipping logic out of blobservice
This closes #6187.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f07ecc6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f07ecc6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f07ecc6
Branch: refs/heads/master
Commit: 3f07ecc652c07101ae1b1a8fb17f89a2d1206e1e
Parents: 137231a
Author: zentol <ch...@apache.org>
Authored: Mon Jun 4 13:50:44 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 21 14:26:09 2018 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/common/Plan.java | 20 +----
.../api/common/cache/DistributedCache.java | 31 +++++++-
.../java/org/apache/flink/util/FileUtils.java | 59 +++++++++++++++
.../org/apache/flink/util/FileUtilsTest.java | 78 +++++++++++++++++++-
.../plantranslate/JobGraphGenerator.java | 54 ++++++++++++--
.../plantranslate/JobGraphGeneratorTest.java | 69 ++++++++++++++++-
.../apache/flink/runtime/blob/BlobClient.java | 42 +----------
.../flink/runtime/filecache/FileCache.java | 63 ++++++----------
.../apache/flink/runtime/jobgraph/JobGraph.java | 3 +-
.../flink/runtime/blob/BlobClientTest.java | 46 ------------
.../filecache/FileCacheDirectoriesTest.java | 19 ++---
.../api/graph/StreamingJobGraphGenerator.java | 7 +-
.../streaming/util/TestStreamEnvironment.java | 6 --
13 files changed, 313 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index db10ce4..efbc4fa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -26,17 +26,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
-import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Visitable;
import org.apache.flink.util.Visitor;
@@ -342,20 +337,7 @@ public class Plan implements Visitable<Operator<?>> {
*/
public void registerCachedFile(String name, DistributedCacheEntry entry) throws IOException {
if (!this.cacheFile.containsKey(name)) {
- try {
- URI u = new URI(entry.filePath);
- if (!u.getPath().startsWith("/")) {
- u = new File(entry.filePath).toURI();
- }
- FileSystem fs = FileSystem.get(u);
- if (fs.exists(new Path(u.getPath()))) {
- this.cacheFile.put(name, new DistributedCacheEntry(u.toString(), entry.isExecutable));
- } else {
- throw new IOException("File " + u.toString() + " doesn't exist.");
- }
- } catch (URISyntaxException ex) {
- throw new IOException("Invalid path: " + entry.filePath, ex);
- }
+ this.cacheFile.put(name, entry);
} else {
throw new IOException("cache file " + name + "already exists!");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
index 4bb4ece..2ad98fb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path;
import java.io.File;
import java.io.Serializable;
import java.net.URI;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -42,6 +43,10 @@ public class DistributedCache {
/**
* Meta info about an entry in {@link DistributedCache}.
+ *
+ * <p>Entries have different semantics for local directories depending on where we are in the job-submission process.
+ * After registration through the API {@code filePath} denotes the original directory.
+ * After the upload to the cluster (which includes zipping the directory), {@code filePath} denotes the (server-side) copy of the zip.
*/
public static class DistributedCacheEntry implements Serializable {
@@ -51,6 +56,17 @@ public class DistributedCache {
public byte[] blobKey;
+ /** Client-side constructor used by the API for initial registration. */
+ public DistributedCacheEntry(String filePath, Boolean isExecutable) {
+ this(filePath, isExecutable, null);
+ }
+
+ /** Client-side constructor used during job-submission for zipped directory. */
+ public DistributedCacheEntry(String filePath, boolean isExecutable, boolean isZipped) {
+ this(filePath, isExecutable, null, isZipped);
+ }
+
+ /** Server-side constructor used during job-submission for zipped directories. */
public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobKey, boolean isZipped) {
this.filePath = filePath;
this.isExecutable = isExecutable;
@@ -58,13 +74,20 @@ public class DistributedCache {
this.isZipped = isZipped;
}
- public DistributedCacheEntry(String filePath, Boolean isExecutable){
- this(filePath, isExecutable, null);
- }
-
+ /** Server-side constructor used during job-submission for files. */
public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobKey){
this(filePath, isExecutable, blobKey, false);
}
+
+ @Override
+ public String toString() {
+ return "DistributedCacheEntry{" +
+ "filePath='" + filePath + '\'' +
+ ", isExecutable=" + isExecutable +
+ ", isZipped=" + isZipped +
+ ", blobKey=" + Arrays.toString(blobKey) +
+ '}';
+ }
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 46c5621..713ed61 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -32,6 +32,9 @@ import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Random;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -353,6 +356,62 @@ public final class FileUtils {
}
}
+ public static Path compressDirectory(Path directory, Path target) throws IOException {
+ FileSystem sourceFs = directory.getFileSystem();
+ FileSystem targetFs = target.getFileSystem();
+
+ try (ZipOutputStream out = new ZipOutputStream(targetFs.create(target, FileSystem.WriteMode.NO_OVERWRITE))) {
+ addToZip(directory, sourceFs, directory.getParent(), out);
+ }
+ return target;
+ }
+
+ private static void addToZip(Path fileOrDirectory, FileSystem fs, Path rootDir, ZipOutputStream out) throws IOException {
+ String relativePath = fileOrDirectory.getPath().replace(rootDir.getPath() + '/', "");
+ if (fs.getFileStatus(fileOrDirectory).isDir()) {
+ out.putNextEntry(new ZipEntry(relativePath + '/'));
+ for (FileStatus containedFile : fs.listStatus(fileOrDirectory)) {
+ addToZip(containedFile.getPath(), fs, rootDir, out);
+ }
+ } else {
+ ZipEntry entry = new ZipEntry(relativePath);
+ out.putNextEntry(entry);
+
+ try (FSDataInputStream in = fs.open(fileOrDirectory)) {
+ IOUtils.copyBytes(in, out, false);
+ }
+ out.closeEntry();
+ }
+ }
+
+ public static Path expandDirectory(Path file, Path targetDirectory) throws IOException {
+ FileSystem sourceFs = file.getFileSystem();
+ FileSystem targetFs = targetDirectory.getFileSystem();
+ Path rootDir = null;
+ try (ZipInputStream zis = new ZipInputStream(sourceFs.open(file))) {
+ ZipEntry entry;
+ while ((entry = zis.getNextEntry()) != null) {
+ Path relativePath = new Path(entry.getName());
+ if (rootDir == null) {
+ // the first entry contains the name of the original directory that was zipped
+ rootDir = relativePath;
+ }
+
+ Path newFile = new Path(targetDirectory, relativePath);
+ if (entry.isDirectory()) {
+ targetFs.mkdirs(newFile);
+ } else {
+ try (FSDataOutputStream fileStream = targetFs.create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
+ // do not close the streams here as it prevents access to further zip entries
+ IOUtils.copyBytes(zis, fileStream, false);
+ }
+ }
+ zis.closeEntry();
+ }
+ }
+ return new Path(targetDirectory, rootDir);
+ }
+
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index 4cd4814..b42ee22 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -27,11 +27,20 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
-
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -166,10 +175,77 @@ public class FileUtilsTest {
assertFalse(parent.exists());
}
+ @Test
+ public void testCompression() throws IOException {
+ final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+ + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+ + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
+ + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
+ + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
+ + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+ + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
+ + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
+ + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
+ + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+ + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
+ + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
+ + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
+ + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+ final java.nio.file.Path compressDir = tmp.newFolder("compressDir").toPath();
+ final java.nio.file.Path extractDir = tmp.newFolder("extractDir").toPath();
+
+ final java.nio.file.Path originalDir = Paths.get("rootDir");
+ final java.nio.file.Path emptySubDir = originalDir.resolve("emptyDir");
+ final java.nio.file.Path fullSubDir = originalDir.resolve("fullDir");
+ final java.nio.file.Path file1 = originalDir.resolve("file1");
+ final java.nio.file.Path file2 = originalDir.resolve("file2");
+ final java.nio.file.Path file3 = fullSubDir.resolve("file3");
+
+ Files.createDirectory(compressDir.resolve(originalDir));
+ Files.createDirectory(compressDir.resolve(emptySubDir));
+ Files.createDirectory(compressDir.resolve(fullSubDir));
+ Files.copy(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), compressDir.resolve(file1));
+ Files.createFile(compressDir.resolve(file2));
+ Files.copy(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), compressDir.resolve(file3));
+
+ final Path zip = FileUtils.compressDirectory(
+ new Path(compressDir.resolve(originalDir).toString()),
+ new Path(compressDir.resolve(originalDir) + ".zip"));
+
+ FileUtils.expandDirectory(zip, new Path(extractDir.toAbsolutePath().toString()));
+
+ assertDirEquals(compressDir.resolve(originalDir), extractDir.resolve(originalDir));
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
+ private static void assertDirEquals(java.nio.file.Path expected, java.nio.file.Path actual) throws IOException {
+ assertEquals(Files.isDirectory(expected), Files.isDirectory(actual));
+ assertEquals(expected.getFileName(), actual.getFileName());
+
+ if (Files.isDirectory(expected)) {
+ List<java.nio.file.Path> expectedContents = Files.list(expected)
+ .sorted(Comparator.comparing(java.nio.file.Path::toString))
+ .collect(Collectors.toList());
+ List<java.nio.file.Path> actualContents = Files.list(actual)
+ .sorted(Comparator.comparing(java.nio.file.Path::toString))
+ .collect(Collectors.toList());
+
+ assertEquals(expectedContents.size(), actualContents.size());
+
+ for (int x = 0; x < expectedContents.size(); x++) {
+ assertDirEquals(expectedContents.get(x), actualContents.get(x));
+ }
+ } else {
+ byte[] expectedBytes = Files.readAllBytes(expected);
+ byte[] actualBytes = Files.readAllBytes(actual);
+ assertArrayEquals(expectedBytes, actualBytes);
+ }
+ }
+
private static void generateRandomDirs(File dir, int numFiles, int numDirs, int depth) throws IOException {
// generate the random files
for (int i = 0; i < numFiles; i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 0c0c3f3..18259d3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -23,14 +23,17 @@ import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -78,12 +81,18 @@ import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.Visitor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -91,7 +100,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.stream.Collectors;
/**
* This component translates the optimizer's resulting {@link org.apache.flink.optimizer.plan.OptimizedPlan}
@@ -106,6 +115,8 @@ import java.util.Map.Entry;
*/
public class JobGraphGenerator implements Visitor<PlanNode> {
+ private static final Logger LOG = LoggerFactory.getLogger(JobGraphGenerator.class);
+
public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux";
private static final boolean mergeIterationAuxTasks =
@@ -243,11 +254,13 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
vertex.setSlotSharingGroup(sharingGroup);
}
- // add registered cache file into job configuration
- for (Entry<String, DistributedCacheEntry> e : program.getOriginalPlan().getCachedFiles()) {
- graph.addUserArtifact(e.getKey(), e.getValue());
- }
+ Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts =
+ program.getOriginalPlan().getCachedFiles().stream()
+ .map(entry -> Tuple2.of(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toList());
+ addUserArtifactEntries(userArtifacts, graph);
+
// release all references again
this.vertices = null;
this.chainedTasks = null;
@@ -260,6 +273,35 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
return graph;
}
+ public static void addUserArtifactEntries(Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts, JobGraph jobGraph) {
+ if (!userArtifacts.isEmpty()) {
+ try {
+ java.nio.file.Path tmpDir = Files.createTempDirectory("flink-distributed-cache-" + jobGraph.getJobID());
+ for (Tuple2<String, DistributedCache.DistributedCacheEntry> originalEntry : userArtifacts) {
+ Path filePath = new Path(originalEntry.f1.filePath);
+ boolean isLocalDir = false;
+ try {
+ FileSystem sourceFs = filePath.getFileSystem();
+ isLocalDir = !sourceFs.isDistributedFS() && sourceFs.getFileStatus(filePath).isDir();
+ } catch (IOException ioe) {
+ LOG.warn("Could not determine whether {} denotes a local path.", filePath, ioe);
+ }
+ // zip local directories because we only support file uploads
+ DistributedCache.DistributedCacheEntry entry;
+ if (isLocalDir) {
+ Path zip = FileUtils.compressDirectory(filePath, new Path(tmpDir.toString(), filePath.getName() + ".zip"));
+ entry = new DistributedCache.DistributedCacheEntry(zip.toString(), originalEntry.f1.isExecutable, true);
+ } else {
+ entry = new DistributedCache.DistributedCacheEntry(filePath.toString(), originalEntry.f1.isExecutable, false);
+ }
+ jobGraph.addUserArtifact(originalEntry.f0, entry);
+ }
+ } catch (IOException ioe) {
+ throw new FlinkRuntimeException("Could not compress distributed-cache artifacts.", ioe);
+ }
+ }
+ }
+
/**
* This methods implements the pre-visiting during a depth-first traversal. It create the job vertex and
* sets local strategy.
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
index 51c6a85..d58dc47 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer.plantranslate;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.ResourceSpec;
@@ -36,14 +37,29 @@ import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
import java.lang.reflect.Method;
-
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class JobGraphGeneratorTest {
+ @Rule
+ public final TemporaryFolder tmp = new TemporaryFolder();
+
/**
* Verifies that the resources are merged correctly for chained operators when
* generating job graph
@@ -206,4 +222,55 @@ public class JobGraphGeneratorTest {
assertTrue(sinkVertex.getPreferredResources().equals(resource6));
assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
}
+
+ @Test
+ public void testArtifactCompression() throws IOException {
+ Path plainFile1 = tmp.newFile("plainFile1").toPath();
+ Path plainFile2 = tmp.newFile("plainFile2").toPath();
+
+ Path directory1 = tmp.newFolder("directory1").toPath();
+ Files.createDirectory(directory1.resolve("containedFile1"));
+
+ Path directory2 = tmp.newFolder("directory2").toPath();
+ Files.createDirectory(directory2.resolve("containedFile2"));
+
+ JobGraph jb = new JobGraph();
+
+ final String executableFileName = "executableFile";
+ final String nonExecutableFileName = "nonExecutableFile";
+ final String executableDirName = "executableDir";
+ final String nonExecutableDirName = "nonExecutableDIr";
+
+ Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
+ Tuple2.of(executableFileName, new DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
+ Tuple2.of(nonExecutableFileName, new DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
+ Tuple2.of(executableDirName, new DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
+ Tuple2.of(nonExecutableDirName, new DistributedCache.DistributedCacheEntry(directory2.toString(), false))
+ );
+
+ JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
+
+ Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts = jb.getUserArtifacts();
+
+ DistributedCache.DistributedCacheEntry executableFileEntry = submittedArtifacts.get(executableFileName);
+ assertState(executableFileEntry, true, false);
+
+ DistributedCache.DistributedCacheEntry nonExecutableFileEntry = submittedArtifacts.get(nonExecutableFileName);
+ assertState(nonExecutableFileEntry, false, false);
+
+ DistributedCache.DistributedCacheEntry executableDirEntry = submittedArtifacts.get(executableDirName);
+ assertState(executableDirEntry, true, true);
+
+ DistributedCache.DistributedCacheEntry nonExecutableDirEntry = submittedArtifacts.get(nonExecutableDirName);
+ assertState(nonExecutableDirEntry, false, true);
+ }
+
+ private static void assertState(DistributedCache.DistributedCacheEntry entry, boolean isExecutable, boolean isZipped) throws IOException {
+ assertNotNull(entry);
+ assertEquals(isExecutable, entry.isExecutable);
+ assertEquals(isZipped, entry.isZipped);
+ org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path(entry.filePath);
+ assertTrue(filePath.getFileSystem().exists(filePath));
+ assertFalse(filePath.getFileSystem().getFileStatus(filePath).isDir());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 1301740..80e36b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.net.SSLUtils;
@@ -49,8 +47,6 @@ import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
@@ -444,42 +440,8 @@ public final class BlobClient implements Closeable {
*/
public PermanentBlobKey uploadFile(JobID jobId, Path file) throws IOException {
final FileSystem fs = file.getFileSystem();
- if (fs.getFileStatus(file).isDir()) {
- return uploadDirectory(jobId, file, fs);
- } else {
- try (InputStream is = fs.open(file)) {
- return (PermanentBlobKey) putInputStream(jobId, is, PERMANENT_BLOB);
- }
- }
- }
-
- private PermanentBlobKey uploadDirectory(JobID jobId, Path file, FileSystem fs) throws IOException {
- try (BlobOutputStream blobOutputStream = new BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
- try (ZipOutputStream zipStream = new ZipOutputStream(blobOutputStream)) {
- compressDirectoryToZipfile(fs, fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
- zipStream.finish();
- return (PermanentBlobKey) blobOutputStream.finish();
- }
- }
- }
-
- private static void compressDirectoryToZipfile(FileSystem fs, FileStatus rootDir, FileStatus sourceDir, ZipOutputStream out) throws IOException {
- for (FileStatus file : fs.listStatus(sourceDir.getPath())) {
- LOG.info("Zipping file: {}", file);
- if (file.isDir()) {
- compressDirectoryToZipfile(fs, rootDir, file, out);
- } else {
- String entryName = file.getPath().getPath().replace(rootDir.getPath().getPath(), "");
- LOG.info("Zipping entry: {}, file: {}, rootDir: {}", entryName, file, rootDir);
- ZipEntry entry = new ZipEntry(entryName);
- out.putNextEntry(entry);
-
- try (FSDataInputStream in = fs.open(file.getPath())) {
- IOUtils.copyBytes(in, out, false);
- }
- out.closeEntry();
- }
+ try (InputStream is = fs.open(file)) {
+ return (PermanentBlobKey) putInputStream(jobId, is, PERMANENT_BLOB);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index fe6c6bc..0019acf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -21,15 +21,12 @@ package org.apache.flink.runtime.filecache;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
@@ -38,9 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.file.Files;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -52,15 +47,17 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * The FileCache is used to create the local files for the registered cache files when a task is deployed.
- * The files will be removed when the task is unregistered after a 5 second delay.
- * A given file x will be placed in "{@code <system-tmp-dir>/tmp_<jobID>/}".
+ * The FileCache is used to access registered cache files when a task is deployed.
+ *
+ * <p>Files and zipped directories are retrieved from the {@link PermanentBlobService}. The life-cycle of these files
+ * is managed by the blob-service.
+ *
+ * <p>Retrieved directories will be expanded in "{@code <system-tmp-dir>/tmp_<jobID>/}"
+ * and deleted when the task is unregistered after a 5 second delay, unless a new task requests the file in the meantime.
*/
public class FileCache {
@@ -167,7 +164,7 @@ public class FileCache {
// ------------------------------------------------------------------------
/**
- * If the file doesn't exists locally, it will copy the file to the temp directory.
+ * If the file doesn't exists locally, retrieve the file from the blob-service.
*
* @param entry The cache entry descriptor (path, executable flag)
* @param jobID The ID of the job for which the file is copied.
@@ -195,20 +192,12 @@ public class FileCache {
nextDirectory = 0;
}
- String sourceFile = entry.filePath;
- int posOfSep = sourceFile.lastIndexOf("/");
- if (posOfSep > 0) {
- sourceFile = sourceFile.substring(posOfSep + 1);
- }
-
- Path target = new Path(tempDirToUse.getAbsolutePath() + "/" + sourceFile);
-
// kick off the copying
Callable<Path> cp;
if (entry.blobKey != null) {
- cp = new CopyFromBlobProcess(entry, jobID, blobService, target);
+ cp = new CopyFromBlobProcess(entry, jobID, blobService, new Path(tempDirToUse.getAbsolutePath()));
} else {
- cp = new CopyFromDFSProcess(entry, target);
+ cp = new CopyFromDFSProcess(entry, new Path(tempDirToUse.getAbsolutePath()));
}
FutureTask<Path> copyTask = new FutureTask<>(cp);
executorService.submit(copyTask);
@@ -278,26 +267,8 @@ public class FileCache {
final File file = blobService.getFile(jobID, blobKey);
if (isDirectory) {
- try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
- ZipEntry entry;
- while ((entry = zis.getNextEntry()) != null) {
- String fileName = entry.getName();
- Path newFile = new Path(target, fileName);
- if (entry.isDirectory()) {
- target.getFileSystem().mkdirs(newFile);
- } else {
- try (FSDataOutputStream fsDataOutputStream = target.getFileSystem()
- .create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
- IOUtils.copyBytes(zis, fsDataOutputStream, false);
- }
- //noinspection ResultOfMethodCallIgnored
- new File(newFile.getPath()).setExecutable(isExecutable);
- }
- zis.closeEntry();
- }
- }
- Files.delete(file.toPath());
- return target;
+ Path directory = FileUtils.expandDirectory(new Path(file.getAbsolutePath()), target);
+ return directory;
} else {
//noinspection ResultOfMethodCallIgnored
file.setExecutable(isExecutable);
@@ -319,7 +290,15 @@ public class FileCache {
public CopyFromDFSProcess(DistributedCacheEntry e, Path cachedPath) {
this.filePath = new Path(e.filePath);
this.executable = e.isExecutable;
- this.cachedPath = cachedPath;
+
+ String sourceFile = e.filePath;
+ int posOfSep = sourceFile.lastIndexOf("/");
+ if (posOfSep > 0) {
+ sourceFile = sourceFile.substring(posOfSep + 1);
+ }
+
+ this.cachedPath = new Path(cachedPath, sourceFile);
+
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 061cabe..7231383 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -637,7 +637,8 @@ public class JobGraph implements Serializable {
new DistributedCache.DistributedCacheEntry(
userArtifact.getValue().filePath,
userArtifact.getValue().isExecutable,
- InstantiationUtil.serializeObject(key)),
+ InstantiationUtil.serializeObject(key),
+ userArtifact.getValue().isZipped),
jobConfiguration);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 7107e24..c083d08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -34,7 +32,6 @@ import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
-import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
@@ -42,15 +39,10 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Random;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
@@ -468,44 +460,6 @@ public class BlobClientTest extends TestLogger {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
- @Test
- public void testDirectoryUploading() throws IOException {
- final File newFolder = temporaryFolder.newFolder();
- final File file1 = File.createTempFile("pre", "suff", newFolder);
- FileUtils.writeFileUtf8(file1, "Test content");
- final File file2 = File.createTempFile("pre", "suff", newFolder);
- FileUtils.writeFileUtf8(file2, "Test content 2");
-
- final Map<String, File> files = new HashMap<>();
- files.put(file1.getName(), file1);
- files.put(file2.getName(), file2);
-
- BlobKey key;
- final JobID jobId = new JobID();
- final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
- try (
- BlobClient client = new BlobClient(
- inetAddress, getBlobClientConfig())) {
-
- key = client.uploadFile(jobId, new Path(newFolder.getPath()));
- }
-
- final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key);
-
- try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
- ZipEntry entry;
- while ((entry = zis.getNextEntry()) != null) {
- String fileName = entry.getName().replaceFirst("/", "");
- final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- IOUtils.copyBytes(zis, outputStream, false);
-
- assertEquals(FileUtils.readFileUtf8(files.get(fileName)),
- new String(outputStream.toByteArray(), StandardCharsets.UTF_8));
- zis.closeEntry();
- }
- }
- }
-
/**
* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)}} helper.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
index 2c716cc..3754f46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.junit.After;
@@ -39,14 +38,12 @@ import org.junit.rules.TemporaryFolder;
import java.io.ByteArrayInputStream;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -84,15 +81,11 @@ public class FileCacheDirectoriesTest {
@Override
public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
if (key.equals(permanentBlobKey)) {
- final File zipArchive = temporaryFolder.newFile("zipArchive");
- try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) {
-
- final ZipEntry zipEntry = new ZipEntry("cacheFile");
- zis.putNextEntry(zipEntry);
-
- IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false);
- }
- return zipArchive;
+ final java.nio.file.Path directory = temporaryFolder.newFolder("zipArchive").toPath();
+ final java.nio.file.Path containedFile = directory.resolve("cacheFile");
+ Files.copy(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), containedFile);
+ Path zipPath = FileUtils.compressDirectory(new Path(directory.toString()), new Path(directory + ".zip"));
+ return new File(zipPath.getPath());
} else {
throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a0695ec..5b8254d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -27,6 +26,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -156,10 +156,7 @@ public class StreamingJobGraphGenerator {
configureCheckpointing();
- // add registered cache file into job configuration
- for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
- jobGraph.addUserArtifact(e.f0, e.f1);
- }
+ JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/3f07ecc6/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 99c7acb..c2e8814 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -19,8 +19,6 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.JobExecutor;
@@ -78,10 +76,6 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
jobGraph.setClasspaths(new ArrayList<>(classPaths));
- for (Tuple2<String, DistributedCache.DistributedCacheEntry> file : cacheFile) {
- jobGraph.addUserArtifact(file.f0, file.f1);
- }
-
return jobExecutor.executeJobBlocking(jobGraph);
}