You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/06/20 12:58:39 UTC
[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/6187
FLINK-9623][runtime] Move zipping logic out of blobservice
## What is the purpose of the change
This PR moves the zipping logic for local user directory artifacts out of the blobservice.
Instead, directories are explicitly zipped by the `JobGraphGenerators` when compiling the graph.
The zipping code was generalized and moved from `FileCache` and `BlobClient` to `FileUtils`.
Additionally, `JobGraphTest` now extends `TestLogger`.
## Brief change log
* move zip compression/expansion as general utility methods to `FileUtils`
* modify `[Streaming]JobGraphGenerator` to zip local user directory artifacts before registering them with the graph
* adjust `FileCacheDirectoryTest` to not manually create a mock zip file
* extend documentation of `FileCache`
* extend documentation of `DistributedCache`
* remove setting of user-artifacts in `TestStreamEnvironment` since it is redundant
* remove odd verification/normalization code in `Plan#registerCachedFile`, changes are covered by JobGraphGenerators
## Verifying this change
* zip compression/expansion is covered by new tests in `FileUtils`
* handling of local user directory artifacts by JobGraphGenerators covered by new tests in `JobGraphGeneratorTest`
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 9280_alpha
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6187.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6187
----
commit a8552a5fc85c4a3122eecd7739f5a02065dc93cd
Author: zentol <ch...@...>
Date: 2018-06-13T14:10:16Z
[hotfix][tests] JobGraphTest extends TestLogger
commit 483ff525a7c047360a5c044fbb3c6bffb08a10c5
Author: zentol <ch...@...>
Date: 2018-06-04T11:50:44Z
FLINK-9623][runtime] Move zipping logic out of blobservice
----
---
[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6187#discussion_r197112144
--- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java ---
@@ -206,4 +222,54 @@ public boolean filter(Tuple2<Long, Long> value) throws Exception {
assertTrue(sinkVertex.getPreferredResources().equals(resource6));
assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
}
+
+ @Test
+ public void testArtifactCompression() throws IOException {
+ Path plainFile1 = tmp.newFile("plainFile1").toPath();
+ Path plainFile2 = tmp.newFile("plainFile2").toPath();
+
+ Path directory1 = tmp.newFolder("directory1").toPath();
+ Files.createDirectory(directory1.resolve("containedFile1"));
+
+ Path directory2 = tmp.newFolder("directory2").toPath();
+ Files.createDirectory(directory2.resolve("containedFile2"));
+
+ JobGraph jb = new JobGraph();
+
+ final String executableFileName = "executableFile";
+ final String nonExecutableFileName = "nonExecutableFile";
+ final String executableDirName = "executableDir";
+ final String nonExecutableDirName = "nonExecutableDIr";
+
+ Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
+ Tuple2.of(executableFileName, new DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
+ Tuple2.of(nonExecutableFileName, new DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
+ Tuple2.of(executableDirName, new DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
+ Tuple2.of(nonExecutableDirName, new DistributedCache.DistributedCacheEntry(directory2.toString(), false))
+ );
+
+ JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
+
+ Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts = jb.getUserArtifacts();
+
+ DistributedCache.DistributedCacheEntry executableFileEntry = submittedArtifacts.get(executableFileName);
+ assertState(executableFileEntry, true, false);
+
+ DistributedCache.DistributedCacheEntry nonExecutableFileEntry = submittedArtifacts.get(nonExecutableFileName);
+ assertState(nonExecutableFileEntry, false, false);
+
+ DistributedCache.DistributedCacheEntry executableDirEntry = submittedArtifacts.get(executableDirName);
+ assertState(executableDirEntry, true, true);
+
+ DistributedCache.DistributedCacheEntry nonExecutableDirEntry = submittedArtifacts.get(nonExecutableDirName);
+ assertState(nonExecutableDirEntry, false, true);
+ }
--- End diff --
that's implicitly checked by assertFalse(filePath.getFileSystem().getFileStatus(filePath).isDir());), which throws an exception for a non existing file.
---
[GitHub] flink issue #6187: FLINK-9623][runtime] Move zipping logic out of blobservic...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/6187
merging.
---
[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6187#discussion_r197089159
--- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java ---
@@ -206,4 +222,54 @@ public boolean filter(Tuple2<Long, Long> value) throws Exception {
assertTrue(sinkVertex.getPreferredResources().equals(resource6));
assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
}
+
+ @Test
+ public void testArtifactCompression() throws IOException {
+ Path plainFile1 = tmp.newFile("plainFile1").toPath();
+ Path plainFile2 = tmp.newFile("plainFile2").toPath();
+
+ Path directory1 = tmp.newFolder("directory1").toPath();
+ Files.createDirectory(directory1.resolve("containedFile1"));
+
+ Path directory2 = tmp.newFolder("directory2").toPath();
+ Files.createDirectory(directory2.resolve("containedFile2"));
+
+ JobGraph jb = new JobGraph();
+
+ final String executableFileName = "executableFile";
+ final String nonExecutableFileName = "nonExecutableFile";
+ final String executableDirName = "executableDir";
+ final String nonExecutableDirName = "nonExecutableDIr";
+
+ Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
+ Tuple2.of(executableFileName, new DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
+ Tuple2.of(nonExecutableFileName, new DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
+ Tuple2.of(executableDirName, new DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
+ Tuple2.of(nonExecutableDirName, new DistributedCache.DistributedCacheEntry(directory2.toString(), false))
+ );
+
+ JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
+
+ Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts = jb.getUserArtifacts();
+
+ DistributedCache.DistributedCacheEntry executableFileEntry = submittedArtifacts.get(executableFileName);
+ assertState(executableFileEntry, true, false);
+
+ DistributedCache.DistributedCacheEntry nonExecutableFileEntry = submittedArtifacts.get(nonExecutableFileName);
+ assertState(nonExecutableFileEntry, false, false);
+
+ DistributedCache.DistributedCacheEntry executableDirEntry = submittedArtifacts.get(executableDirName);
+ assertState(executableDirEntry, true, true);
+
+ DistributedCache.DistributedCacheEntry nonExecutableDirEntry = submittedArtifacts.get(nonExecutableDirName);
+ assertState(nonExecutableDirEntry, false, true);
+ }
--- End diff --
Should we also check whether the zip file has been created?
---
[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6187#discussion_r197112471
--- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java ---
@@ -206,4 +222,54 @@ public boolean filter(Tuple2<Long, Long> value) throws Exception {
assertTrue(sinkVertex.getPreferredResources().equals(resource6));
assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
}
+
+ @Test
+ public void testArtifactCompression() throws IOException {
+ Path plainFile1 = tmp.newFile("plainFile1").toPath();
+ Path plainFile2 = tmp.newFile("plainFile2").toPath();
+
+ Path directory1 = tmp.newFolder("directory1").toPath();
+ Files.createDirectory(directory1.resolve("containedFile1"));
+
+ Path directory2 = tmp.newFolder("directory2").toPath();
+ Files.createDirectory(directory2.resolve("containedFile2"));
+
+ JobGraph jb = new JobGraph();
+
+ final String executableFileName = "executableFile";
+ final String nonExecutableFileName = "nonExecutableFile";
+ final String executableDirName = "executableDir";
+ final String nonExecutableDirName = "nonExecutableDIr";
+
+ Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
+ Tuple2.of(executableFileName, new DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
+ Tuple2.of(nonExecutableFileName, new DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
+ Tuple2.of(executableDirName, new DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
+ Tuple2.of(nonExecutableDirName, new DistributedCache.DistributedCacheEntry(directory2.toString(), false))
+ );
+
+ JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
+
+ Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts = jb.getUserArtifacts();
+
+ DistributedCache.DistributedCacheEntry executableFileEntry = submittedArtifacts.get(executableFileName);
+ assertState(executableFileEntry, true, false);
+
+ DistributedCache.DistributedCacheEntry nonExecutableFileEntry = submittedArtifacts.get(nonExecutableFileName);
+ assertState(nonExecutableFileEntry, false, false);
+
+ DistributedCache.DistributedCacheEntry executableDirEntry = submittedArtifacts.get(executableDirName);
+ assertState(executableDirEntry, true, true);
+
+ DistributedCache.DistributedCacheEntry nonExecutableDirEntry = submittedArtifacts.get(nonExecutableDirName);
+ assertState(nonExecutableDirEntry, false, true);
+ }
--- End diff --
but I'll add an explicit check.
---
[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6187
---