You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/06/20 12:58:39 UTC

[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/6187

    FLINK-9623][runtime] Move zipping logic out of blobservice

    ## What is the purpose of the change
    
    This PR moves the zipping logic for local user directory artifacts out of the blobservice.
    Instead, directories are explicitly zipped by the `JobGraphGenerators` when compiling the graph.
    The zipping code was generalized and moved from `FileCache` and `BlobClient` to `FileUtils`.
    
    Additionally, `JobGraphTest` now extends `TestLogger`.
    
    ## Brief change log
    
    * move zip compression/expansion as general utility methods to `FileUtils`
    * modify `[Streaming]JobGraphGenerator` to zip local user directory artifacts before registering them with the graph
    * adjust `FileCacheDirectoryTest` to not manually create a mock zip file
    * extend documentation of `FileCache`
    * extend documentation of `DistributedCache`
    * remove setting of user-artifacts in `TestStreamEnvironment` since it is redundant
    * remove odd verification/normalization code in `Plan#registerCachedFile`, changes are covered by JobGraphGenerators
    
    ## Verifying this change
    
    * zip compression/expansion is covered by new tests in `FileUtils`
    * handling of local user directory artifacts by JobGraphGenerators covered by new tests in `JobGraphGeneratorTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 9280_alpha

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6187.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6187
    
----
commit a8552a5fc85c4a3122eecd7739f5a02065dc93cd
Author: zentol <ch...@...>
Date:   2018-06-13T14:10:16Z

    [hotfix][tests] JobGraphTest extends TestLogger

commit 483ff525a7c047360a5c044fbb3c6bffb08a10c5
Author: zentol <ch...@...>
Date:   2018-06-04T11:50:44Z

    FLINK-9623][runtime] Move zipping logic out of blobservice

----


---

[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6187#discussion_r197112144
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java ---
    @@ -206,4 +222,54 @@ public boolean filter(Tuple2<Long, Long> value) throws Exception {
     		assertTrue(sinkVertex.getPreferredResources().equals(resource6));
     		assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
     	}
    +
    +	@Test
    +	public void testArtifactCompression() throws IOException {
    +		Path plainFile1 = tmp.newFile("plainFile1").toPath();
    +		Path plainFile2 = tmp.newFile("plainFile2").toPath();
    +
    +		Path directory1 = tmp.newFolder("directory1").toPath();
    +		Files.createDirectory(directory1.resolve("containedFile1"));
    +
    +		Path directory2 = tmp.newFolder("directory2").toPath();
    +		Files.createDirectory(directory2.resolve("containedFile2"));
    +
    +		JobGraph jb = new JobGraph();
    +
    +		final String executableFileName = "executableFile";
    +		final String nonExecutableFileName = "nonExecutableFile";
    +		final String executableDirName = "executableDir";
    +		final String nonExecutableDirName = "nonExecutableDIr";
    +
    +		Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
    +			Tuple2.of(executableFileName, new DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
    +			Tuple2.of(nonExecutableFileName, new DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
    +			Tuple2.of(executableDirName, new DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
    +			Tuple2.of(nonExecutableDirName, new DistributedCache.DistributedCacheEntry(directory2.toString(), false))
    +		);
    +
    +		JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
    +
    +		Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts = jb.getUserArtifacts();
    +
    +		DistributedCache.DistributedCacheEntry executableFileEntry = submittedArtifacts.get(executableFileName);
    +		assertState(executableFileEntry, true, false);
    +
    +		DistributedCache.DistributedCacheEntry nonExecutableFileEntry = submittedArtifacts.get(nonExecutableFileName);
    +		assertState(nonExecutableFileEntry, false, false);
    +
    +		DistributedCache.DistributedCacheEntry executableDirEntry = submittedArtifacts.get(executableDirName);
    +		assertState(executableDirEntry, true, true);
    +
    +		DistributedCache.DistributedCacheEntry nonExecutableDirEntry = submittedArtifacts.get(nonExecutableDirName);
    +		assertState(nonExecutableDirEntry, false, true);
    +	}
    --- End diff --
    
    that's implicitly checked by assertFalse(filePath.getFileSystem().getFileStatus(filePath).isDir());), which throws an exception for a non existing file.


---

[GitHub] flink issue #6187: FLINK-9623][runtime] Move zipping logic out of blobservic...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/6187
  
    merging.


---

[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6187#discussion_r197089159
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java ---
    @@ -206,4 +222,54 @@ public boolean filter(Tuple2<Long, Long> value) throws Exception {
     		assertTrue(sinkVertex.getPreferredResources().equals(resource6));
     		assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
     	}
    +
    +	@Test
    +	public void testArtifactCompression() throws IOException {
    +		Path plainFile1 = tmp.newFile("plainFile1").toPath();
    +		Path plainFile2 = tmp.newFile("plainFile2").toPath();
    +
    +		Path directory1 = tmp.newFolder("directory1").toPath();
    +		Files.createDirectory(directory1.resolve("containedFile1"));
    +
    +		Path directory2 = tmp.newFolder("directory2").toPath();
    +		Files.createDirectory(directory2.resolve("containedFile2"));
    +
    +		JobGraph jb = new JobGraph();
    +
    +		final String executableFileName = "executableFile";
    +		final String nonExecutableFileName = "nonExecutableFile";
    +		final String executableDirName = "executableDir";
    +		final String nonExecutableDirName = "nonExecutableDIr";
    +
    +		Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
    +			Tuple2.of(executableFileName, new DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
    +			Tuple2.of(nonExecutableFileName, new DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
    +			Tuple2.of(executableDirName, new DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
    +			Tuple2.of(nonExecutableDirName, new DistributedCache.DistributedCacheEntry(directory2.toString(), false))
    +		);
    +
    +		JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
    +
    +		Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts = jb.getUserArtifacts();
    +
    +		DistributedCache.DistributedCacheEntry executableFileEntry = submittedArtifacts.get(executableFileName);
    +		assertState(executableFileEntry, true, false);
    +
    +		DistributedCache.DistributedCacheEntry nonExecutableFileEntry = submittedArtifacts.get(nonExecutableFileName);
    +		assertState(nonExecutableFileEntry, false, false);
    +
    +		DistributedCache.DistributedCacheEntry executableDirEntry = submittedArtifacts.get(executableDirName);
    +		assertState(executableDirEntry, true, true);
    +
    +		DistributedCache.DistributedCacheEntry nonExecutableDirEntry = submittedArtifacts.get(nonExecutableDirName);
    +		assertState(nonExecutableDirEntry, false, true);
    +	}
    --- End diff --
    
    Should we also check whether the zip file has been created?


---

[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6187#discussion_r197112471
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java ---
    @@ -206,4 +222,54 @@ public boolean filter(Tuple2<Long, Long> value) throws Exception {
     		assertTrue(sinkVertex.getPreferredResources().equals(resource6));
     		assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
     	}
    +
    +	@Test
    +	public void testArtifactCompression() throws IOException {
    +		Path plainFile1 = tmp.newFile("plainFile1").toPath();
    +		Path plainFile2 = tmp.newFile("plainFile2").toPath();
    +
    +		Path directory1 = tmp.newFolder("directory1").toPath();
    +		Files.createDirectory(directory1.resolve("containedFile1"));
    +
    +		Path directory2 = tmp.newFolder("directory2").toPath();
    +		Files.createDirectory(directory2.resolve("containedFile2"));
    +
    +		JobGraph jb = new JobGraph();
    +
    +		final String executableFileName = "executableFile";
    +		final String nonExecutableFileName = "nonExecutableFile";
    +		final String executableDirName = "executableDir";
    +		final String nonExecutableDirName = "nonExecutableDIr";
    +
    +		Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
    +			Tuple2.of(executableFileName, new DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
    +			Tuple2.of(nonExecutableFileName, new DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
    +			Tuple2.of(executableDirName, new DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
    +			Tuple2.of(nonExecutableDirName, new DistributedCache.DistributedCacheEntry(directory2.toString(), false))
    +		);
    +
    +		JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
    +
    +		Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts = jb.getUserArtifacts();
    +
    +		DistributedCache.DistributedCacheEntry executableFileEntry = submittedArtifacts.get(executableFileName);
    +		assertState(executableFileEntry, true, false);
    +
    +		DistributedCache.DistributedCacheEntry nonExecutableFileEntry = submittedArtifacts.get(nonExecutableFileName);
    +		assertState(nonExecutableFileEntry, false, false);
    +
    +		DistributedCache.DistributedCacheEntry executableDirEntry = submittedArtifacts.get(executableDirName);
    +		assertState(executableDirEntry, true, true);
    +
    +		DistributedCache.DistributedCacheEntry nonExecutableDirEntry = submittedArtifacts.get(nonExecutableDirName);
    +		assertState(nonExecutableDirEntry, false, true);
    +	}
    --- End diff --
    
    but I'll add an explicit check.


---

[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6187


---