You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dawidwys <gi...@git.apache.org> on 2018/02/26 13:52:20 UTC

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

GitHub user dawidwys opened a pull request:

    https://github.com/apache/flink/pull/5580

    [FLINK-8620] Enable shipping custom files to BlobStore and accessing …

    …them through DistributedCache
    
    
    *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
    
    *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
    
    ## Contribution Checklist
    
      - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
      
      - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
      Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
    
      - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
      
      - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices).
    
      - Each pull request should address only one issue, not mix up code from multiple issues.
      
      - Each commit in the pull request has a meaningful commit message (including the JIRA id)
    
      - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
    
    
    **(The sections below can be removed for hotfixes of typos)**
    
    ## What is the purpose of the change
    
    *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
    
    
    ## Brief change log
    
    *(for example:)*
      - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
      - *Deployments RPC transmits only the blob storage reference*
      - *TaskManagers retrieve the TaskInfo from the blob cache*
    
    
    ## Verifying this change
    
    *(Please pick either of the following options)*
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    *(or)*
    
    This change is already covered by existing tests, such as *(please describe tests)*.
    
    *(or)*
    
    This change added tests and can be verified as follows:
    
    *(example:)*
      - *Added integration tests for end-to-end deployment with large payloads (100MB)*
      - *Extended integration test for recovery after master (JobManager) failure*
      - *Added test that validates that TaskInfo is transferred only once across recoveries*
      - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
      - The serializers: (yes / no / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
      - The S3 file system connector: (yes / no / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / no)
      - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dawidwys/flink distributed-cache

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5580.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5580
    
----
commit a555a7470777d2439b0b9f6200c0030dc6e102d8
Author: Dawid Wysakowicz <dw...@...>
Date:   2018-02-07T15:21:42Z

    [FLINK-8620] Enable shipping custom files to BlobStore and accessing them through DistributedCache

----


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    I've addressed @zentol last comments and rebased to fix conflicts. Will merge after travis gives green.


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    Thanks @zentol for a review. 
    
    You are right not supporting it in old cluster mode would be a regression. As I've first discussed it, there were some doubts if it could work in old mode (some hypothetical problems with blobs  timeouting during submission). Therefore I started with the RestClusterClient, but as I've had a look today, I saw no problem with doing it also for old cluster mode.
    
    I've also reverted the cleanup process for `FileCache`.
    
    Please have a look at the updated PR, if you have time. I will also rebase it shortly.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179052173
  
    --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml ---
    @@ -0,0 +1,106 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +    Licensed to the Apache Software Foundation (ASF) under one
    +    or more contributor license agreements.  See the NOTICE file
    +    distributed with this work for additional information
    +    regarding copyright ownership.  The ASF licenses this file
    +    to you under the Apache License, Version 2.0 (the
    +    "License"); you may not use this file except in compliance
    +    with the License.  You may obtain a copy of the License at
    +
    +      http://www.apache.org/licenses/LICENSE-2.0
    +
    +    Unless required by applicable law or agreed to in writing,
    +    software distributed under the License is distributed on an
    +    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +    KIND, either express or implied.  See the License for the
    +    specific language governing permissions and limitations
    +    under the License.
    +    -->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +
    +	<parent>
    +		<artifactId>flink-end-to-end-tests</artifactId>
    +		<groupId>org.apache.flink</groupId>
    +		<version>1.6-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<artifactId>flink-distributed-cache-via-blob-test_${scala.binary.version}</artifactId>
    +	<name>flink-distributed-cache-via-blob</name>
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-core</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +	</dependencies>
    +
    +	<build>
    +		<plugins>
    +			<plugin>
    +				<groupId>org.apache.maven.plugins</groupId>
    +				<artifactId>maven-jar-plugin</artifactId>
    +				<version>2.4</version>
    +
    +				<executions>
    +					<!-- ClassLoaderTestProgram -->
    +					<execution>
    +						<id>ClassLoaderTestProgram</id>
    +						<phase>package</phase>
    +						<goals>
    +							<goal>jar</goal>
    +						</goals>
    +						<configuration>
    +							<classifier>DistributedCacheViaBlobTestProgram</classifier>
    +
    +							<archive>
    +								<manifestEntries>
    +									<program-class>org.apache.flink.streaming.tests.DistributedCacheViaBlobTestProgram</program-class>
    +								</manifestEntries>
    +							</archive>
    +
    +							<includes>
    +								<include>org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram*</include>
    +							</includes>
    +						</configuration>
    +					</execution>
    +				</executions>
    +			</plugin>
    +
    +			<!--simplify the name of the testing JARs for referring to them in the end-to-end test scripts-->
    +			<plugin>
    +				<groupId>org.apache.maven.plugins</groupId>
    +				<artifactId>maven-antrun-plugin</artifactId>
    --- End diff --
    
    this can be removed if you add `<finalName>DataSetAllroundTestProgram</finalName>` to the `maven-jar-plugin` configuration, see the `flink-dataset-allround-test` module.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176661729
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Asynchronous file copy process.
    -	 */
    -	private static class CopyProcess implements Callable<Path> {
    -
    -		private final Path filePath;
    -		private final Path cachedPath;
    -		private boolean executable;
    -
    -		public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -			this.filePath = new Path(e.filePath);
    -			this.executable = e.isExecutable;
    -			this.cachedPath = cachedPath;
    -		}
    -
    -		@Override
    -		public Path call() throws IOException {
    -			// let exceptions propagate. we can retrieve them later from
    -			// the future and report them upon access to the result
    -			copy(filePath, cachedPath, this.executable);
    -			return cachedPath;
    -		}
    -	}
    -
    -	/**
    -	 * If no task is using this file after 5 seconds, clear it.
    +	 * Asynchronous file copy process from blob server.
     	 */
    -	private static class DeleteProcess implements Runnable {
    +	private static class CopyFromBlobProcess implements Callable<Path> {
     
    -		private final Object lock;
    -		private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
    -
    -		private final String name;
    +		private final PermanentBlobKey blobKey;
    +		private final Path target;
    +		private final boolean directory;
    +		private final boolean executable;
     		private final JobID jobID;
    +		private final PermanentBlobService blobService;
     
    -		public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
    -								String name, JobID jobID) {
    -			this.lock = lock;
    -			this.entries = entries;
    -			this.name = name;
    -			this.jobID = jobID;
    +		CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) {
    +			try {
    +				this.executable = e.isExecutable;
    +				this.directory = e.isZipped;
    +				this.jobID = jobID;
    +				this.blobService = blobService;
    +				this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader());
    +				this.target = target;
    +			} catch (Exception ex) {
    +				throw new RuntimeException(ex);
    +			}
     		}
     
     		@Override
    -		public void run() {
    -			try {
    -				synchronized (lock) {
    -					Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
    -
    -					if (jobEntries != null) {
    -						Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
    -
    -						if (entry != null) {
    -							int count = entry.f0;
    -							if (count > 1) {
    -								// multiple references still
    -								entry.f0 = count - 1;
    -							}
    -							else {
    -								// we remove the last reference
    -								jobEntries.remove(name);
    -								if (jobEntries.isEmpty()) {
    -									entries.remove(jobID);
    -								}
    -
    -								// abort the copy
    -								entry.f3.cancel(true);
    -
    -								// remove the file
    -								File file = new File(entry.f2.toString());
    -								if (file.exists()) {
    -									if (file.isDirectory()) {
    -										FileUtils.deleteDirectory(file);
    -									}
    -									else if (!file.delete()) {
    -										LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
    -									}
    -								}
    -
    -								// remove the job wide temp directory, if it is now empty
    -								File parent = entry.f1;
    -								if (parent.isDirectory()) {
    -									String[] children = parent.list();
    -									if (children == null || children.length == 0) {
    -										//noinspection ResultOfMethodCallIgnored
    -										parent.delete();
    -									}
    -								}
    +		public Path call() throws IOException {
    +			final File file = blobService.getFile(jobID, blobKey);
    +
    +			if (directory) {
    +				try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +					ZipEntry entry;
    +					while ((entry = zis.getNextEntry()) != null) {
    --- End diff --
    
    the entry should be closed by calling `zis.closeEntry()`.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176767320
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -1831,4 +1831,5 @@ public void registerCachedFile(String filePath, String name) {
     	public void registerCachedFile(String filePath, String name, boolean executable) {
     		this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
     	}
    +
    --- End diff --
    
    revert


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r171181348
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, String name) {
     	 * @param executable flag indicating whether the file should be executable
     	 */
     	public void registerCachedFile(String filePath, String name, boolean executable) {
    -		this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
    +		registerCachedFile(filePath, name, executable, false);
     	}
    +
    +	/**
    +	 * Registers a file at the distributed cache under the given name. The file will be accessible
    +	 * from any user-defined function in the (distributed) runtime under a local path. If upload is true files will
    +	 * be distributed via {@link BlobServer} otherwise Files should be local files (as long as all relevant workers
    +	 * have access to it), or files in a distributed file system. The runtime will copy the files temporarily to a
    +	 * local cache, if needed.
    +	 *
    +	 * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
    +	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
    +	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
    +	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
    +	 *
    +	 * @param filePath The path of the file
    +	 * @param name The name under which the file is registered.
    +	 * @param executable flag indicating whether the file should be executable
    +	 * @param upload flag indicating if the file should be distributed via BlobServer
    +	 */
    +	public void registerCachedFile(String filePath, String name, boolean executable, boolean upload) {
    --- End diff --
    
    What behaviour do you suggest sending all local files through Blob and serving files from DFS as before?



---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    @zentol What do you think we still need now? I think this makes things easier for users. And we also need this for work on the Beam Flink Runner.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179116502
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log
     		);
     	}
     
    +	public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
    +		checkNotNull(jobId);
    +
    +		synchronized (lock) {
    +			Set<ExecutionAttemptID> jobRefCounter = jobRefHolders.get(jobId);
    +
    +			if (jobRefCounter == null || jobRefCounter.isEmpty()) {
    +				LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId);
    +				return;
    +			}
    +
    +			jobRefCounter.remove(executionId);
    --- End diff --
    
    You are right! I added removing it in the DeleteProcess.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r171175037
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.filecache;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.cache.DistributedCache;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.PermanentBlobKey;
    +import org.apache.flink.runtime.blob.PermanentBlobService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
    +import org.apache.flink.shaded.guava18.com.google.common.io.Files;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.junit.runner.RunWith;
    +import org.mockito.Mock;
    +import org.mockito.runners.MockitoJUnitRunner;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.concurrent.Future;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests that {@link FileCache} can read files from {@link BlobServer}.
    + */
    +@RunWith(MockitoJUnitRunner.class)
    +public class FileCacheReadsFromBlobTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +		+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +		+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +		+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +		+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +		+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +		+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +		+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +		+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +		+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +		+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +		+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +		+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +		+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private FileCache fileCache;
    +
    +	@Mock
    +	private PermanentBlobService blobService;
    +
    +	@Before
    +	public void setup() throws IOException {
    +		try {
    +			String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()};
    +			fileCache = new FileCache(tmpDirectories, blobService);
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +			fail("Cannot create FileCache: " + e.getMessage());
    +		}
    +	}
    +
    +	@After
    +	public void shutdown() {
    +		try {
    +			fileCache.shutdown();
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +			fail("FileCache shutdown failed: " + e.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testFileDownloadedFromBlob() {
    +		try {
    +			JobID jobID = new JobID();
    +
    +			final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    +			when(blobService.getFile(jobID, permanentBlobKey)).thenAnswer(inv -> {
    +					File f = temporaryFolder.newFile("cacheFile");
    +					try {
    +						Files.write(testFileContent, f, Charsets.UTF_8);
    --- End diff --
    
    use `FileUtils#writeFileUtf8` instead


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176745753
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) {
     	 * 		Any additional configuration for the blob client
     	 * @param jobId
     	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    -	 * @param jars
    -	 * 		List of JAR files to upload
    +	 * @param files
    +	 * 		List of files to upload
     	 *
     	 * @throws IOException
     	 * 		if the upload fails
     	 */
    -	public static List<PermanentBlobKey> uploadJarFiles(
    -			InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> jars)
    +	public static List<PermanentBlobKey> uploadFiles(
    +			InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> files)
     			throws IOException {
     
     		checkNotNull(jobId);
     
    -		if (jars.isEmpty()) {
    +		if (files.isEmpty()) {
     			return Collections.emptyList();
     		} else {
     			List<PermanentBlobKey> blobKeys = new ArrayList<>();
     
     			try (BlobClient blobClient = new BlobClient(serverAddress, clientConfig)) {
    -				for (final Path jar : jars) {
    -					final FileSystem fs = jar.getFileSystem();
    -					FSDataInputStream is = null;
    -					try {
    -						is = fs.open(jar);
    -						final PermanentBlobKey key =
    -							(PermanentBlobKey) blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
    -						blobKeys.add(key);
    -					} finally {
    -						if (is != null) {
    -							is.close();
    -						}
    -					}
    +				for (final Path file : files) {
    +					final PermanentBlobKey key = blobClient.uploadFile(jobId, file);
    +					blobKeys.add(key);
     				}
     			}
     
     			return blobKeys;
     		}
     	}
     
    -	// --------------------------------------------------------------------------------------------
    -	//  Miscellaneous
    -	// --------------------------------------------------------------------------------------------
    -
    -	private static Throwable readExceptionFromStream(InputStream in) throws IOException {
    -		int len = readLength(in);
    -		byte[] bytes = new byte[len];
    -		readFully(in, bytes, 0, len, "Error message");
    +	/**
    +	 * Uploads a single file to the {@link PermanentBlobService} of the given {@link BlobServer}.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    +	 * @param file
    +	 * 		file to upload
    +	 *
    +	 * @throws IOException
    +	 * 		if the upload fails
    +	 */
    +	public PermanentBlobKey uploadFile(JobID jobId, Path file) throws IOException {
    +		final FileSystem fs = file.getFileSystem();
    +		if (fs.getFileStatus(file).isDir()) {
    +			return uploadDirectory(jobId, file, fs);
    +		} else {
    +			try (InputStream is = fs.open(file)) {
    +				return (PermanentBlobKey) putInputStream(jobId, is, PERMANENT_BLOB);
    +			}
    +		}
    +	}
     
    -		try {
    -			return (Throwable) InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
    +	private PermanentBlobKey uploadDirectory(JobID jobId, Path file, FileSystem fs) throws IOException {
    +		try (BlobOutputStream blobOutputStream = new BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
    +			try (ZipOutputStream zipStream = new ZipOutputStream(blobOutputStream)) {
    +				compressDirectoryToZipfile(fs, fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
    +				zipStream.finish();
    +				return (PermanentBlobKey) blobOutputStream.finish();
    +			}
     		}
    -		catch (ClassNotFoundException e) {
    -			// should never occur
    -			throw new IOException("Could not transfer error message", e);
    +	}
    +
    +	private static void compressDirectoryToZipfile(FileSystem fs, FileStatus rootDir, FileStatus sourceDir, ZipOutputStream out) throws IOException {
    +		for (FileStatus file : fs.listStatus(sourceDir.getPath())) {
    +			LOG.info("Zipping file: {}", file);
    +			if (file.isDir()) {
    +				compressDirectoryToZipfile(fs, rootDir, file, out);
    +			} else {
    +				String entryName = file.getPath().getPath().replace(rootDir.getPath().getPath(), "");
    --- End diff --
    
    Unfortunately I could not it removes only the part to the root of the directory. If we have nested directories we need more than just `getName`.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r183702885
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.filecache;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.cache.DistributedCache;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.blob.PermanentBlobKey;
    +import org.apache.flink.runtime.blob.PermanentBlobService;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.IOUtils;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.zip.ZipEntry;
    +import java.util.zip.ZipOutputStream;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after.
    + */
    +public class FileCacheDirectoriesTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +	+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +	+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +	+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +	+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +	+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +	+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +	+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +	+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +	+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +	+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +	+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +	+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +	+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private FileCache fileCache;
    +
    +	private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    +
    +	private final PermanentBlobService blobService = new PermanentBlobService() {
    +		@Override
    +		public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
    +			if (key.equals(permanentBlobKey)) {
    +				final File zipArchive = temporaryFolder.newFile("zipArchive");
    +				try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) {
    +
    +					final ZipEntry zipEntry = new ZipEntry("cacheFile");
    +					zis.putNextEntry(zipEntry);
    +
    +					IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false);
    +				}
    +				return zipArchive;
    +			} else {
    +				throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey);
    +			}
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +
    +		}
    +	};
    +
    +	@Before
    +	public void setup() throws Exception {
    +		fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
    +	}
    +
    +	@After
    +	public void shutdown() {
    +		fileCache.shutdown();
    +	}
    +
    +	@Test
    +	public void testDirectoryDownloadedFromBlob() throws Exception {
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		assertTrue(fileStatus.isDir());
    +
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fs.exists(cacheFile));
    +		final String actualContent = FileUtils.readFileUtf8(new File(cacheFile.getPath()));
    +		assertEquals(testFileContent, actualContent);
    +	}
    +
    +	@Test
    +	public void testDirectoryCleanUp() throws Exception {
    +		fileCache.shutdown();
    --- End diff --
    
    Could we not always pass a `DeleteCapturingDirectScheduledExecutorService` into the FileCache? We can explicitly call run in `@After`; that would allow us to share the filecache between both tests again.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176658879
  
    --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java ---
    @@ -75,15 +66,12 @@
      */
     @PublicEvolving
     public class PythonStreamExecutionEnvironment {
    -	private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
    --- End diff --
    
    I would leave it even if it isn't used


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179053003
  
    --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.tests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import org.apache.commons.lang3.StringUtils;
    +
    +import java.io.File;
    +import java.nio.file.Files;
    +import java.nio.file.Paths;
    +
    +import static java.util.Collections.singletonList;
    +
    +/**
    + * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through
    + * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read
    + * version read from cache, we delete the initial file.
    + */
    +public class DistributedCacheViaBlobTestProgram {
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		final ParameterTool params = ParameterTool.fromArgs(args);
    +
    +		final String fileContent = params.getRequired("content");
    +		final String tempDir = params.getRequired("tempDir");
    +
    +		final File tempFile = File.createTempFile("temp", null, new File(tempDir));
    +		Files.write(tempFile.toPath(), singletonList(fileContent));
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		env.registerCachedFile(tempFile.getPath(), "test_data", false);
    +
    +		env.fromElements(1)
    +			.map(new TestMapFunction(tempFile.getAbsolutePath()))
    +			.writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute("Distributed Cache Via Blob Test Program");
    +	}
    +
    +	static class TestMapFunction extends RichMapFunction<Integer, String> {
    +
    +		private String initialPath;
    +
    +		public TestMapFunction(String initialPath) {
    +			this.initialPath = initialPath;
    +		}
    +
    +		@Override
    +		public String map(Integer value) throws Exception {
    +			Files.deleteIfExists(Paths.get(initialPath));
    +			return StringUtils.join(Files
    +				.readAllLines(getRuntimeContext().getDistributedCache().getFile("test_data").toPath()), "\n");
    --- End diff --
    
    this could be replaced with
    ```
    return Files.readAllLines(getRuntimeContext().getDistributedCache().getFile("test_data").toPath()).stream()
    	.collect(Collectors.joining("\n"));
    ```


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179123350
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java ---
    @@ -164,16 +165,13 @@ public void testDirectoryCleanUp() throws Exception {
     		assertTrue(fs.exists(cacheFile));
     
     		fileCache.releaseJob(jobID, attemptID2);
    -		// still should be available, file will be deleted after 5 seconds
    +		// still should be available, file will be deleted after 200 milliseconds
     		assertTrue(fileStatus.isDir());
     		assertTrue(fs.exists(cacheFile));
    --- End diff --
    
    what you _could_ also do is pass in a `ScheduledExecutorService`. You could then intercept the delete process, verify the interval arguments and fire the process at your leisure.
    
    I'm worried that in it's current form the test is either not stable or takes longer than it should.


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    Ok, to unblock this, I would suggest to add the new functionality under a different name, support only adding a single file (and also verify that the path passed in is a single file). @dawidwys would that be straightforward to do?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176764920
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Asynchronous file copy process.
    -	 */
    -	private static class CopyProcess implements Callable<Path> {
    -
    -		private final Path filePath;
    -		private final Path cachedPath;
    -		private boolean executable;
    -
    -		public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -			this.filePath = new Path(e.filePath);
    -			this.executable = e.isExecutable;
    -			this.cachedPath = cachedPath;
    -		}
    -
    -		@Override
    -		public Path call() throws IOException {
    -			// let exceptions propagate. we can retrieve them later from
    -			// the future and report them upon access to the result
    -			copy(filePath, cachedPath, this.executable);
    -			return cachedPath;
    -		}
    -	}
    -
    -	/**
    -	 * If no task is using this file after 5 seconds, clear it.
    +	 * Asynchronous file copy process from blob server.
     	 */
    -	private static class DeleteProcess implements Runnable {
    +	private static class CopyFromBlobProcess implements Callable<Path> {
     
    -		private final Object lock;
    -		private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
    -
    -		private final String name;
    +		private final PermanentBlobKey blobKey;
    +		private final Path target;
    +		private final boolean directory;
    +		private final boolean executable;
     		private final JobID jobID;
    +		private final PermanentBlobService blobService;
     
    -		public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
    -								String name, JobID jobID) {
    -			this.lock = lock;
    -			this.entries = entries;
    -			this.name = name;
    -			this.jobID = jobID;
    +		CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) {
    +			try {
    +				this.executable = e.isExecutable;
    +				this.directory = e.isZipped;
    +				this.jobID = jobID;
    +				this.blobService = blobService;
    +				this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader());
    +				this.target = target;
    +			} catch (Exception ex) {
    +				throw new RuntimeException(ex);
    +			}
     		}
     
     		@Override
    -		public void run() {
    -			try {
    -				synchronized (lock) {
    -					Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
    -
    -					if (jobEntries != null) {
    -						Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
    -
    -						if (entry != null) {
    -							int count = entry.f0;
    -							if (count > 1) {
    -								// multiple references still
    -								entry.f0 = count - 1;
    -							}
    -							else {
    -								// we remove the last reference
    -								jobEntries.remove(name);
    -								if (jobEntries.isEmpty()) {
    -									entries.remove(jobID);
    -								}
    -
    -								// abort the copy
    -								entry.f3.cancel(true);
    -
    -								// remove the file
    -								File file = new File(entry.f2.toString());
    -								if (file.exists()) {
    -									if (file.isDirectory()) {
    -										FileUtils.deleteDirectory(file);
    -									}
    -									else if (!file.delete()) {
    -										LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
    -									}
    -								}
    -
    -								// remove the job wide temp directory, if it is now empty
    -								File parent = entry.f1;
    -								if (parent.isDirectory()) {
    -									String[] children = parent.list();
    -									if (children == null || children.length == 0) {
    -										//noinspection ResultOfMethodCallIgnored
    -										parent.delete();
    -									}
    -								}
    +		public Path call() throws IOException {
    +			final File file = blobService.getFile(jobID, blobKey);
    +
    +			if (directory) {
    +				try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +					ZipEntry entry;
    +					while ((entry = zis.getNextEntry()) != null) {
    --- End diff --
    
    what about the last entry?


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    @aljoscha In the end I've implemented uploading directories as zip archives, as @zentol suggested. End-to-end python tests passes. Let me know what you both think.


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    @zentol I've tried changing the behaviour of `registerCachedFile` to always distribute files via BlobServer, but I've stumbled across problem with python API. Right now whole directories can be distributed via DFS. Unfortunately we can upload only single files under a single key to BlobServer. I wonder what should be better solution for it:
    
    1. Extend BlobClient to enable uploading whole directories under a single key
    2. Upload python directories as plain files (with apropriately adjusted keys e.g. path/to/filename) and later on restore structure on access
    
    What do you think?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176660974
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) {
     	 * 		Any additional configuration for the blob client
     	 * @param jobId
     	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    -	 * @param jars
    -	 * 		List of JAR files to upload
    +	 * @param files
    +	 * 		List of files to upload
     	 *
     	 * @throws IOException
     	 * 		if the upload fails
     	 */
    -	public static List<PermanentBlobKey> uploadJarFiles(
    -			InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> jars)
    +	public static List<PermanentBlobKey> uploadFiles(
    +			InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> files)
     			throws IOException {
     
     		checkNotNull(jobId);
     
    -		if (jars.isEmpty()) {
    +		if (files.isEmpty()) {
     			return Collections.emptyList();
     		} else {
     			List<PermanentBlobKey> blobKeys = new ArrayList<>();
     
     			try (BlobClient blobClient = new BlobClient(serverAddress, clientConfig)) {
    -				for (final Path jar : jars) {
    -					final FileSystem fs = jar.getFileSystem();
    -					FSDataInputStream is = null;
    -					try {
    -						is = fs.open(jar);
    -						final PermanentBlobKey key =
    -							(PermanentBlobKey) blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
    -						blobKeys.add(key);
    -					} finally {
    -						if (is != null) {
    -							is.close();
    -						}
    -					}
    +				for (final Path file : files) {
    +					final PermanentBlobKey key = blobClient.uploadFile(jobId, file);
    +					blobKeys.add(key);
     				}
     			}
     
     			return blobKeys;
     		}
     	}
     
    -	// --------------------------------------------------------------------------------------------
    -	//  Miscellaneous
    -	// --------------------------------------------------------------------------------------------
    -
    -	private static Throwable readExceptionFromStream(InputStream in) throws IOException {
    -		int len = readLength(in);
    -		byte[] bytes = new byte[len];
    -		readFully(in, bytes, 0, len, "Error message");
    +	/**
    +	 * Uploads a single file to the {@link PermanentBlobService} of the given {@link BlobServer}.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    +	 * @param file
    +	 * 		file to upload
    +	 *
    +	 * @throws IOException
    +	 * 		if the upload fails
    +	 */
    +	public PermanentBlobKey uploadFile(JobID jobId, Path file) throws IOException {
    +		final FileSystem fs = file.getFileSystem();
    +		if (fs.getFileStatus(file).isDir()) {
    +			return uploadDirectory(jobId, file, fs);
    +		} else {
    +			try (InputStream is = fs.open(file)) {
    +				return (PermanentBlobKey) putInputStream(jobId, is, PERMANENT_BLOB);
    +			}
    +		}
    +	}
     
    -		try {
    -			return (Throwable) InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
    +	private PermanentBlobKey uploadDirectory(JobID jobId, Path file, FileSystem fs) throws IOException {
    +		try (BlobOutputStream blobOutputStream = new BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
    +			try (ZipOutputStream zipStream = new ZipOutputStream(blobOutputStream)) {
    +				compressDirectoryToZipfile(fs, fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
    +				zipStream.finish();
    +				return (PermanentBlobKey) blobOutputStream.finish();
    +			}
     		}
    -		catch (ClassNotFoundException e) {
    -			// should never occur
    -			throw new IOException("Could not transfer error message", e);
    +	}
    +
    +	private static void compressDirectoryToZipfile(FileSystem fs, FileStatus rootDir, FileStatus sourceDir, ZipOutputStream out) throws IOException {
    +		for (FileStatus file : fs.listStatus(sourceDir.getPath())) {
    +			LOG.info("Zipping file: {}", file);
    +			if (file.isDir()) {
    +				compressDirectoryToZipfile(fs, rootDir, file, out);
    +			} else {
    +				String entryName = file.getPath().getPath().replace(rootDir.getPath().getPath(), "");
    --- End diff --
    
    coudln't you used `Path#getName()`?


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    So the current state of this PR doesn't work with the Python API because it uploads complete directories? Is that really used/needed by the Python API?
    
    I would like to have this feature in 1.5, if possible. This PR also adds an end-to-end test, which should make it very robust against future changes.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179075409
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java ---
    @@ -33,16 +38,16 @@
     
     import java.io.File;
     import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
     import java.util.concurrent.Future;
     
    -import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertTrue;
    -import static org.junit.Assert.fail;
     
     /**
    - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds.
    + * Tests that {@link FileCache} can read files from {@link BlobServer}.
      */
    -public class FileCacheDeleteValidationTest {
    +public class FileCacheReadsFromBlobTest {
    --- End diff --
    
    as far the filecache is concerned it doesn't follow a different path though, right?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179050544
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -331,7 +332,6 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
     		CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose(
     			(JobGraph jobGraphToSubmit) -> {
     				log.info("Submitting job graph.");
    -
    --- End diff --
    
    revert


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    Not supporting directories would be a regression regardless of whether the Python API needs it or not.
    
    We either have to add a new method with different behavior, or zip directories underneath before uploading them.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r171174660
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.filecache;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.cache.DistributedCache;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.PermanentBlobKey;
    +import org.apache.flink.runtime.blob.PermanentBlobService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
    +import org.apache.flink.shaded.guava18.com.google.common.io.Files;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.junit.runner.RunWith;
    +import org.mockito.Mock;
    +import org.mockito.runners.MockitoJUnitRunner;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.concurrent.Future;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests that {@link FileCache} can read files from {@link BlobServer}.
    + */
    +@RunWith(MockitoJUnitRunner.class)
    +public class FileCacheReadsFromBlobTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +		+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +		+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +		+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +		+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +		+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +		+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +		+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +		+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +		+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +		+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +		+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +		+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +		+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private FileCache fileCache;
    +
    +	@Mock
    +	private PermanentBlobService blobService;
    +
    +	@Before
    +	public void setup() throws IOException {
    +		try {
    +			String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()};
    +			fileCache = new FileCache(tmpDirectories, blobService);
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +			fail("Cannot create FileCache: " + e.getMessage());
    +		}
    +	}
    +
    +	@After
    +	public void shutdown() {
    +		try {
    +			fileCache.shutdown();
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +			fail("FileCache shutdown failed: " + e.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testFileDownloadedFromBlob() {
    +		try {
    +			JobID jobID = new JobID();
    +
    +			final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    +			when(blobService.getFile(jobID, permanentBlobKey)).thenAnswer(inv -> {
    --- End diff --
    
    Given that you only have to implement 2 methods (one of which can be empty) I'd argue we don't need a mock here.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r171171252
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---
    @@ -106,9 +106,9 @@
      * are created for the plan nodes, on the way back up, the nodes connect their predecessors.
      */
     public class JobGraphGenerator implements Visitor<PlanNode> {
    -	
    +
     	public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux";
    -	
    --- End diff --
    
    this would be a lot easier to review without the formatting changes :/


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176658024
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -306,18 +311,20 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
     		CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
     			getDispatcherAddress(),
     			(BlobServerPortResponseBody response, String dispatcherAddress) -> {
    -				log.info("Uploading jar files.");
     				final int blobServerPort = response.port;
     				final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
     				final List<PermanentBlobKey> keys;
     				try {
    -					keys = BlobClient.uploadJarFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
    +					log.info("Uploading jar files.");
    +					keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
    +					log.info("Uploading custom files.");
    --- End diff --
    
    don't think this should be logged if there's nothing to upload (which generally will be the default)


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179057582
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
    @@ -631,14 +631,14 @@ else if (current == ExecutionState.CANCELING) {
     						DistributedCache.readFileInfoFromConfig(jobConfiguration))
     				{
     					LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
    -					Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
    +					Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);
     					distributedCacheEntries.put(entry.getKey(), cp);
     				}
     			}
     			catch (Exception e) {
     				throw new Exception(
    -					String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
    --- End diff --
    
    revert


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176659832
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
    @@ -488,13 +493,36 @@ public void addJar(Path jar) {
     		return userJars;
     	}
     
    +	/**
    +	 * Adds the path of a JAR file required to run the job on a task manager.
    --- End diff --
    
    it may not be a jar file


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176662946
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java ---
    @@ -19,30 +19,38 @@
     package org.apache.flink.runtime.filecache;
     
     import org.apache.flink.api.common.JobID;
    -import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
    +import org.apache.flink.api.common.cache.DistributedCache;
     import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.PermanentBlobKey;
    +import org.apache.flink.runtime.blob.PermanentBlobService;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.InstantiationUtil;
     
    -import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
     import org.apache.flink.shaded.guava18.com.google.common.io.Files;
     
    +import org.apache.commons.lang3.StringUtils;
     import org.junit.After;
     import org.junit.Before;
     import org.junit.Rule;
     import org.junit.Test;
     import org.junit.rules.TemporaryFolder;
    +import org.junit.runner.RunWith;
    +import org.mockito.runners.MockitoJUnitRunner;
     
     import java.io.File;
     import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
     import java.util.concurrent.Future;
     
    -import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertTrue;
    -import static org.junit.Assert.fail;
     
     /**
    - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds.
    + * Tests that {@link FileCache} can read files from {@link BlobServer}.
      */
    -public class FileCacheDeleteValidationTest {
    +@RunWith(MockitoJUnitRunner.class)
    --- End diff --
    
    Why is this needed now? I don't see any mocking being added.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176662599
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---
    @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl
     	}
     
     	/**
    -	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper.
    +	 * Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper.
     	 */
     	@Test
     	public void testUploadJarFilesHelper() throws Exception {
     		uploadJarFile(getBlobServer(), getBlobClientConfig());
     	}
     
    +	@Test
    +	public void testDirectoryUploading() throws IOException {
    +		final File newFolder = temporaryFolder.newFolder();
    +		final File file1 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file1, "Test content");
    +		final File file2 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file2, "Test content 2");
    +
    +		final Map<String, File> files = new HashMap<>();
    +		files.put(file1.getName(), file1);
    +		files.put(file2.getName(), file2);
    +
    +		BlobKey key;
    +		final JobID jobId = new JobID();
    +		final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
    +		try (
    +			BlobClient client = new BlobClient(
    +				inetAddress, getBlobClientConfig())) {
    +
    +			key = client.uploadFile(jobId, new Path(newFolder.getPath()));
    +		}
    +
    +		final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key);
    +
    +		try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +			ZipEntry entry;
    +			while ((entry = zis.getNextEntry()) != null) {
    --- End diff --
    
    entry should be closed


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    @zentol I agree it will need to be changed in the end (though wouldn't say rewritten completely) after the submission process is completed. 
    
    I think though this change is important from the perspective of integration with portability layer in beam. Would be glad to hear @aljoscha's opinion on that.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179060824
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java ---
    @@ -33,16 +38,16 @@
     
     import java.io.File;
     import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
     import java.util.concurrent.Future;
     
    -import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertTrue;
    -import static org.junit.Assert.fail;
     
     /**
    - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds.
    + * Tests that {@link FileCache} can read files from {@link BlobServer}.
      */
    -public class FileCacheDeleteValidationTest {
    +public class FileCacheReadsFromBlobTest {
    --- End diff --
    
    am i missing something, or isn't this now a duplicate of `FileCacheDirectoryTest#testDirectoryDownloadedFromBlob`


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176768205
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---
    @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl
     	}
     
     	/**
    -	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper.
    +	 * Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper.
     	 */
     	@Test
     	public void testUploadJarFilesHelper() throws Exception {
     		uploadJarFile(getBlobServer(), getBlobClientConfig());
     	}
     
    +	@Test
    +	public void testDirectoryUploading() throws IOException {
    +		final File newFolder = temporaryFolder.newFolder();
    +		final File file1 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file1, "Test content");
    +		final File file2 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file2, "Test content 2");
    +
    +		final Map<String, File> files = new HashMap<>();
    +		files.put(file1.getName(), file1);
    +		files.put(file2.getName(), file2);
    +
    +		BlobKey key;
    +		final JobID jobId = new JobID();
    +		final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
    +		try (
    +			BlobClient client = new BlobClient(
    +				inetAddress, getBlobClientConfig())) {
    +
    +			key = client.uploadFile(jobId, new Path(newFolder.getPath()));
    +		}
    +
    +		final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key);
    +
    +		try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +			ZipEntry entry;
    +			while ((entry = zis.getNextEntry()) != null) {
    --- End diff --
    
    that's an implementation detail though


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179052578
  
    --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.tests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import org.apache.commons.lang3.StringUtils;
    --- End diff --
    
    undeclared dependency.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r171174187
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, String name) {
     	 * @param executable flag indicating whether the file should be executable
     	 */
     	public void registerCachedFile(String filePath, String name, boolean executable) {
    -		this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
    +		registerCachedFile(filePath, name, executable, false);
     	}
    +
    +	/**
    +	 * Registers a file at the distributed cache under the given name. The file will be accessible
    +	 * from any user-defined function in the (distributed) runtime under a local path. If upload is true files will
    +	 * be distributed via {@link BlobServer} otherwise Files should be local files (as long as all relevant workers
    +	 * have access to it), or files in a distributed file system. The runtime will copy the files temporarily to a
    +	 * local cache, if needed.
    +	 *
    +	 * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
    +	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
    +	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
    +	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
    +	 *
    +	 * @param filePath The path of the file
    +	 * @param name The name under which the file is registered.
    +	 * @param executable flag indicating whether the file should be executable
    +	 * @param upload flag indicating if the file should be distributed via BlobServer
    +	 */
    +	public void registerCachedFile(String filePath, String name, boolean executable, boolean upload) {
    --- End diff --
    
    I'm wondering whether we really need a new method. We could just modify the behavior of the existing method which should be transparent to the user.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176767271
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java ---
    @@ -40,6 +42,7 @@
     /**
      * Test the distributed cache.
      */
    +@Category(Flip6.class)
    --- End diff --
    
    revert?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179116344
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.filecache;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.cache.DistributedCache;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.blob.PermanentBlobKey;
    +import org.apache.flink.runtime.blob.PermanentBlobService;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.IOUtils;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.concurrent.Future;
    +import java.util.zip.ZipEntry;
    +import java.util.zip.ZipOutputStream;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after.
    + */
    +public class FileCacheDirectoriesTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +	+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +	+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +	+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +	+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +	+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +	+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +	+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +	+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +	+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +	+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +	+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +	+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +	+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private FileCache fileCache;
    +
    +	private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    +
    +	private final PermanentBlobService blobService = new PermanentBlobService() {
    +		@Override
    +		public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
    +			if (key.equals(permanentBlobKey)) {
    +				final File zipArchive = temporaryFolder.newFile("zipArchive");
    +				try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) {
    +
    +					final ZipEntry zipEntry = new ZipEntry("cacheFile");
    +					zis.putNextEntry(zipEntry);
    +
    +					IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false);
    +				}
    +				return zipArchive;
    +			} else {
    +				throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey);
    +			}
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +
    +		}
    +	};
    +
    +	@Before
    +	public void setup() throws Exception {
    +		fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
    +	}
    +
    +	@After
    +	public void shutdown() {
    +		fileCache.shutdown();
    +	}
    +
    +	@Test
    +	public void testDirectoryDownloadedFromBlob() throws Exception {
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		assertTrue(fileStatus.isDir());
    +
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fs.exists(cacheFile));
    +		final String actualContent = FileUtils.readFileUtf8(new File(cacheFile.getPath()));
    +		assertEquals(testFileContent, actualContent);
    +	}
    +
    +	@Test
    +	public void testDirectoryCleanUp() throws Exception {
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
    +		ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID1);
    +		fileCache.createTmpFile(fileName, entry, jobID, attemptID2);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		fileCache.releaseJob(jobID, attemptID1);
    +		// still should be available
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		fileCache.releaseJob(jobID, attemptID2);
    +		// still should be available, file will be deleted after 5 seconds
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		// after a while, the file should disappear
    +		long deadline = System.currentTimeMillis() + 10000;
    +		do {
    +			Thread.sleep(5500);
    --- End diff --
    
    I can not just use `fileCache.shutdown` because the shutdown performs additional clearing of temporary directories, which is not what I want to test. I've changed the implementation. I am not sure thought about making the `ExecutorService` `VisibleForTesting`. Another alternative would be to revert to a similar loop, but this time with a shorter period.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179079963
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java ---
    @@ -33,16 +38,16 @@
     
     import java.io.File;
     import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
     import java.util.concurrent.Future;
     
    -import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertTrue;
    -import static org.junit.Assert.fail;
     
     /**
    - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds.
    + * Tests that {@link FileCache} can read files from {@link BlobServer}.
      */
    -public class FileCacheDeleteValidationTest {
    +public class FileCacheReadsFromBlobTest {
    --- End diff --
    
    it does, it is `FileCache` responsibility to unzip.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179049139
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -231,6 +234,7 @@ public TaskExecutor(
     
     		hardwareDescription = HardwareDescription.extractFromSystem(
     			taskExecutorServices.getMemoryManager().getMemorySize());
    +
    --- End diff --
    
    revert


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176745183
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---
    @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl
     	}
     
     	/**
    -	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper.
    +	 * Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper.
     	 */
     	@Test
     	public void testUploadJarFilesHelper() throws Exception {
     		uploadJarFile(getBlobServer(), getBlobClientConfig());
     	}
     
    +	@Test
    +	public void testDirectoryUploading() throws IOException {
    +		final File newFolder = temporaryFolder.newFolder();
    +		final File file1 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file1, "Test content");
    +		final File file2 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file2, "Test content 2");
    +
    +		final Map<String, File> files = new HashMap<>();
    +		files.put(file1.getName(), file1);
    +		files.put(file2.getName(), file2);
    +
    +		BlobKey key;
    +		final JobID jobId = new JobID();
    +		final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
    +		try (
    +			BlobClient client = new BlobClient(
    +				inetAddress, getBlobClientConfig())) {
    +
    +			key = client.uploadFile(jobId, new Path(newFolder.getPath()));
    +		}
    +
    +		final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key);
    +
    +		try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +			ZipEntry entry;
    +			while ((entry = zis.getNextEntry()) != null) {
    --- End diff --
    
    It does not need to. `getNextEntry` does it, at the very beginning.


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by ifndef-SleePy <gi...@git.apache.org>.
Github user ifndef-SleePy commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    @zentol All right, got your point. That's a problem indeed. 


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176746645
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---
    @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl
     	}
     
     	/**
    -	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper.
    +	 * Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper.
     	 */
     	@Test
     	public void testUploadJarFilesHelper() throws Exception {
     		uploadJarFile(getBlobServer(), getBlobClientConfig());
     	}
     
    +	@Test
    +	public void testDirectoryUploading() throws IOException {
    +		final File newFolder = temporaryFolder.newFolder();
    +		final File file1 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file1, "Test content");
    +		final File file2 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file2, "Test content 2");
    +
    +		final Map<String, File> files = new HashMap<>();
    +		files.put(file1.getName(), file1);
    +		files.put(file2.getName(), file2);
    +
    +		BlobKey key;
    +		final JobID jobId = new JobID();
    +		final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
    +		try (
    +			BlobClient client = new BlobClient(
    +				inetAddress, getBlobClientConfig())) {
    +
    +			key = client.uploadFile(jobId, new Path(newFolder.getPath()));
    +		}
    +
    +		final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key);
    +
    +		try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +			ZipEntry entry;
    +			while ((entry = zis.getNextEntry()) != null) {
    +				String fileName = entry.getName().replaceAll("/", "");
    --- End diff --
    
    It is not sufficient. In this testcase `entry.getName` returns `/pre,,,suff`. Note the `/` in the beginning.
    
    Though I've changed `replaceAll` to `replaceFirst`.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176747071
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Asynchronous file copy process.
    -	 */
    -	private static class CopyProcess implements Callable<Path> {
    -
    -		private final Path filePath;
    -		private final Path cachedPath;
    -		private boolean executable;
    -
    -		public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -			this.filePath = new Path(e.filePath);
    -			this.executable = e.isExecutable;
    -			this.cachedPath = cachedPath;
    -		}
    -
    -		@Override
    -		public Path call() throws IOException {
    -			// let exceptions propagate. we can retrieve them later from
    -			// the future and report them upon access to the result
    -			copy(filePath, cachedPath, this.executable);
    -			return cachedPath;
    -		}
    -	}
    -
    -	/**
    -	 * If no task is using this file after 5 seconds, clear it.
    +	 * Asynchronous file copy process from blob server.
     	 */
    -	private static class DeleteProcess implements Runnable {
    +	private static class CopyFromBlobProcess implements Callable<Path> {
     
    -		private final Object lock;
    -		private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
    -
    -		private final String name;
    +		private final PermanentBlobKey blobKey;
    +		private final Path target;
    +		private final boolean directory;
    +		private final boolean executable;
     		private final JobID jobID;
    +		private final PermanentBlobService blobService;
     
    -		public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
    -								String name, JobID jobID) {
    -			this.lock = lock;
    -			this.entries = entries;
    -			this.name = name;
    -			this.jobID = jobID;
    +		CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) {
    +			try {
    +				this.executable = e.isExecutable;
    +				this.directory = e.isZipped;
    +				this.jobID = jobID;
    +				this.blobService = blobService;
    +				this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader());
    +				this.target = target;
    +			} catch (Exception ex) {
    +				throw new RuntimeException(ex);
    +			}
     		}
     
     		@Override
    -		public void run() {
    -			try {
    -				synchronized (lock) {
    -					Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
    -
    -					if (jobEntries != null) {
    -						Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
    -
    -						if (entry != null) {
    -							int count = entry.f0;
    -							if (count > 1) {
    -								// multiple references still
    -								entry.f0 = count - 1;
    -							}
    -							else {
    -								// we remove the last reference
    -								jobEntries.remove(name);
    -								if (jobEntries.isEmpty()) {
    -									entries.remove(jobID);
    -								}
    -
    -								// abort the copy
    -								entry.f3.cancel(true);
    -
    -								// remove the file
    -								File file = new File(entry.f2.toString());
    -								if (file.exists()) {
    -									if (file.isDirectory()) {
    -										FileUtils.deleteDirectory(file);
    -									}
    -									else if (!file.delete()) {
    -										LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
    -									}
    -								}
    -
    -								// remove the job wide temp directory, if it is now empty
    -								File parent = entry.f1;
    -								if (parent.isDirectory()) {
    -									String[] children = parent.list();
    -									if (children == null || children.length == 0) {
    -										//noinspection ResultOfMethodCallIgnored
    -										parent.delete();
    -									}
    -								}
    +		public Path call() throws IOException {
    +			final File file = blobService.getFile(jobID, blobKey);
    +
    +			if (directory) {
    +				try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +					ZipEntry entry;
    +					while ((entry = zis.getNextEntry()) != null) {
    --- End diff --
    
    It does not need to. getNextEntry does it, at the very beginning.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176658516
  
    --- Diff: flink-end-to-end-tests/pom.xml ---
    @@ -78,6 +78,27 @@ under the License.
     							</includes>
     						</configuration>
     					</execution>
    +					<execution>
    --- End diff --
    
    this will need a rebase; each tests now has its own module


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176658632
  
    --- Diff: flink-end-to-end-tests/pom.xml ---
    @@ -78,6 +78,27 @@ under the License.
     							</includes>
     						</configuration>
     					</execution>
    +					<execution>
    +						<id>DistributedCacheViaBlobTestProgram</id>
    +						<phase>package</phase>
    +						<goals>
    +							<goal>jar</goal>
    +						</goals>
    +						<configuration>
    +							<classifier>DistributedCacheViaBlobTestProgram</classifier>
    +
    +							<archive>
    +								<manifestEntries>
    +									<program-class>org.apache.flink.streaming.tests.DistributedCacheViaBlobTestProgram</program-class>
    +								</manifestEntries>
    +							</archive>
    +
    +							<includes>
    +								<include>org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.class</include>
    --- End diff --
    
    This can use wildcards, no? `org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram* `


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179053801
  
    --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.tests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import org.apache.commons.lang3.StringUtils;
    +
    +import java.io.File;
    +import java.nio.file.Files;
    +import java.nio.file.Paths;
    +
    +import static java.util.Collections.singletonList;
    +
    +/**
    + * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through
    + * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read
    + * version read from cache, we delete the initial file.
    + */
    +public class DistributedCacheViaBlobTestProgram {
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		final ParameterTool params = ParameterTool.fromArgs(args);
    +
    +		final String fileContent = params.getRequired("content");
    +		final String tempDir = params.getRequired("tempDir");
    +
    +		final File tempFile = File.createTempFile("temp", null, new File(tempDir));
    +		Files.write(tempFile.toPath(), singletonList(fileContent));
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		env.registerCachedFile(tempFile.getPath(), "test_data", false);
    +
    +		env.fromElements(1)
    +			.map(new TestMapFunction(tempFile.getAbsolutePath()))
    +			.writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute("Distributed Cache Via Blob Test Program");
    +	}
    +
    +	static class TestMapFunction extends RichMapFunction<Integer, String> {
    +
    +		private String initialPath;
    +
    +		public TestMapFunction(String initialPath) {
    +			this.initialPath = initialPath;
    +		}
    +
    +		@Override
    +		public String map(Integer value) throws Exception {
    +			Files.deleteIfExists(Paths.get(initialPath));
    --- End diff --
    
    this seems icky. could we not compare the paths instead to make sure we're accessing a different file?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179054280
  
    --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.tests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import org.apache.commons.lang3.StringUtils;
    +
    +import java.io.File;
    +import java.nio.file.Files;
    +import java.nio.file.Paths;
    +
    +import static java.util.Collections.singletonList;
    +
    +/**
    + * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through
    + * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read
    + * version read from cache, we delete the initial file.
    + */
    +public class DistributedCacheViaBlobTestProgram {
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		final ParameterTool params = ParameterTool.fromArgs(args);
    +
    +		final String fileContent = params.getRequired("content");
    +		final String tempDir = params.getRequired("tempDir");
    +
    +		final File tempFile = File.createTempFile("temp", null, new File(tempDir));
    --- End diff --
    
    use a random name instead, in case the tempDir isn't randomized.


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    I think that's a very good solution in the end that even simplifies things. What do you think, @zentol ?


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by ifndef-SleePy <gi...@git.apache.org>.
Github user ifndef-SleePy commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    Hi @zentol 
    
    In Flip-6 the client communicates with cluster via REST API, that's true. However this not include blobs. Currently in `RestClusterClient.submit()` method, the client uses blob service to upload jars of user. So this PR does not break the rule.
    
    I think this feature is quite good. Spark, even MapReduce support uploading user-defined jars/files/archives. I think we should support it. Users would be able to migrate to Flink more fluently with this feature.
    
    What do you think?


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by ifndef-SleePy <gi...@git.apache.org>.
Github user ifndef-SleePy commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    If some day the behavior of `RestClusterClient` changes. Maybe uploading the jars via REST API. We could just upload the user-defined files via REST API either. That's not a big problem, right?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176657746
  
    --- Diff: flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java ---
    @@ -30,6 +33,7 @@
     /**
      * Tests for the PythonPlanBinder.
      */
    +@Category(Flip6.class)
    --- End diff --
    
    why does this not run anymore against legacy clusters?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179087145
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java ---
    @@ -33,16 +38,16 @@
     
     import java.io.File;
     import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
     import java.util.concurrent.Future;
     
    -import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertTrue;
    -import static org.junit.Assert.fail;
     
     /**
    - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds.
    + * Tests that {@link FileCache} can read files from {@link BlobServer}.
      */
    -public class FileCacheDeleteValidationTest {
    +public class FileCacheReadsFromBlobTest {
    --- End diff --
    
    ah yes, i was misreading the test...
    ´


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    @zentol I've rebased this PR. Feel free to review whenever you find a while.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176766216
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Asynchronous file copy process.
    -	 */
    -	private static class CopyProcess implements Callable<Path> {
    -
    -		private final Path filePath;
    -		private final Path cachedPath;
    -		private boolean executable;
    -
    -		public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -			this.filePath = new Path(e.filePath);
    -			this.executable = e.isExecutable;
    -			this.cachedPath = cachedPath;
    -		}
    -
    -		@Override
    -		public Path call() throws IOException {
    -			// let exceptions propagate. we can retrieve them later from
    -			// the future and report them upon access to the result
    -			copy(filePath, cachedPath, this.executable);
    -			return cachedPath;
    -		}
    -	}
    -
    -	/**
    -	 * If no task is using this file after 5 seconds, clear it.
    +	 * Asynchronous file copy process from blob server.
     	 */
    -	private static class DeleteProcess implements Runnable {
    +	private static class CopyFromBlobProcess implements Callable<Path> {
     
    -		private final Object lock;
    -		private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
    -
    -		private final String name;
    +		private final PermanentBlobKey blobKey;
    +		private final Path target;
    +		private final boolean directory;
    +		private final boolean executable;
     		private final JobID jobID;
    +		private final PermanentBlobService blobService;
     
    -		public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
    -								String name, JobID jobID) {
    -			this.lock = lock;
    -			this.entries = entries;
    -			this.name = name;
    -			this.jobID = jobID;
    +		CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) {
    +			try {
    +				this.executable = e.isExecutable;
    +				this.directory = e.isZipped;
    +				this.jobID = jobID;
    +				this.blobService = blobService;
    +				this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader());
    +				this.target = target;
    +			} catch (Exception ex) {
    +				throw new RuntimeException(ex);
    +			}
     		}
     
     		@Override
    -		public void run() {
    -			try {
    -				synchronized (lock) {
    -					Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
    -
    -					if (jobEntries != null) {
    -						Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
    -
    -						if (entry != null) {
    -							int count = entry.f0;
    -							if (count > 1) {
    -								// multiple references still
    -								entry.f0 = count - 1;
    -							}
    -							else {
    -								// we remove the last reference
    -								jobEntries.remove(name);
    -								if (jobEntries.isEmpty()) {
    -									entries.remove(jobID);
    -								}
    -
    -								// abort the copy
    -								entry.f3.cancel(true);
    -
    -								// remove the file
    -								File file = new File(entry.f2.toString());
    -								if (file.exists()) {
    -									if (file.isDirectory()) {
    -										FileUtils.deleteDirectory(file);
    -									}
    -									else if (!file.delete()) {
    -										LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
    -									}
    -								}
    -
    -								// remove the job wide temp directory, if it is now empty
    -								File parent = entry.f1;
    -								if (parent.isDirectory()) {
    -									String[] children = parent.list();
    -									if (children == null || children.length == 0) {
    -										//noinspection ResultOfMethodCallIgnored
    -										parent.delete();
    -									}
    -								}
    +		public Path call() throws IOException {
    +			final File file = blobService.getFile(jobID, blobKey);
    +
    +			if (directory) {
    +				try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +					ZipEntry entry;
    +					while ((entry = zis.getNextEntry()) != null) {
    --- End diff --
    
    however one could argue that this is an implementation detail.


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    R: @aljoscha 


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    As far as i know one of the end-goals of FLIP-6 is to have all client-cluster communication go through the REST API. This implies that the client cannot submit things directly to the blob service.
    
    I'm wondering whether it really makes sense to change the current behavior as any change made would effectively be deprecated-on-arrival.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176662637
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---
    @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl
     	}
     
     	/**
    -	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper.
    +	 * Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper.
     	 */
     	@Test
     	public void testUploadJarFilesHelper() throws Exception {
     		uploadJarFile(getBlobServer(), getBlobClientConfig());
     	}
     
    +	@Test
    +	public void testDirectoryUploading() throws IOException {
    +		final File newFolder = temporaryFolder.newFolder();
    +		final File file1 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file1, "Test content");
    +		final File file2 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file2, "Test content 2");
    +
    +		final Map<String, File> files = new HashMap<>();
    +		files.put(file1.getName(), file1);
    +		files.put(file2.getName(), file2);
    +
    +		BlobKey key;
    +		final JobID jobId = new JobID();
    +		final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
    +		try (
    +			BlobClient client = new BlobClient(
    +				inetAddress, getBlobClientConfig())) {
    +
    +			key = client.uploadFile(jobId, new Path(newFolder.getPath()));
    +		}
    +
    +		final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key);
    +
    +		try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +			ZipEntry entry;
    +			while ((entry = zis.getNextEntry()) != null) {
    +				String fileName = entry.getName().replaceAll("/", "");
    --- End diff --
    
    `Path#getName()`?


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    I'm aware that the `RestClusterClient` currently submits jars directly to the blob service. If this PR would've been opened half a year ago I would've welcomed it with open arms.
    
    My point still stands though, every line that is touched here would have to be rewritten once we do that, which may very well happen before 1.6.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r171172025
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---
    @@ -106,9 +106,9 @@
      * are created for the plan nodes, on the way back up, the nodes connect their predecessors.
      */
     public class JobGraphGenerator implements Visitor<PlanNode> {
    -	
    +
     	public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux";
    -	
    --- End diff --
    
    Sorry for that. Forgot to disable stripping whitespaces on save. Will revert those changes.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176662188
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Asynchronous file copy process.
    -	 */
    -	private static class CopyProcess implements Callable<Path> {
    -
    -		private final Path filePath;
    -		private final Path cachedPath;
    -		private boolean executable;
    -
    -		public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -			this.filePath = new Path(e.filePath);
    -			this.executable = e.isExecutable;
    -			this.cachedPath = cachedPath;
    -		}
    -
    -		@Override
    -		public Path call() throws IOException {
    -			// let exceptions propagate. we can retrieve them later from
    -			// the future and report them upon access to the result
    -			copy(filePath, cachedPath, this.executable);
    -			return cachedPath;
    -		}
    -	}
    -
    -	/**
    -	 * If no task is using this file after 5 seconds, clear it.
    +	 * Asynchronous file copy process from blob server.
     	 */
    -	private static class DeleteProcess implements Runnable {
    +	private static class CopyFromBlobProcess implements Callable<Path> {
     
    -		private final Object lock;
    -		private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
    -
    -		private final String name;
    +		private final PermanentBlobKey blobKey;
    +		private final Path target;
    +		private final boolean directory;
    +		private final boolean executable;
     		private final JobID jobID;
    +		private final PermanentBlobService blobService;
     
    -		public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
    --- End diff --
    
    who's responsible for cleaning up the created local files?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r183703858
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.filecache;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.cache.DistributedCache;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.blob.PermanentBlobKey;
    +import org.apache.flink.runtime.blob.PermanentBlobService;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.IOUtils;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.zip.ZipEntry;
    +import java.util.zip.ZipOutputStream;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after.
    + */
    +public class FileCacheDirectoriesTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +	+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +	+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +	+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +	+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +	+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +	+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +	+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +	+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +	+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +	+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +	+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +	+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +	+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private FileCache fileCache;
    +
    +	private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    +
    +	private final PermanentBlobService blobService = new PermanentBlobService() {
    +		@Override
    +		public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
    +			if (key.equals(permanentBlobKey)) {
    +				final File zipArchive = temporaryFolder.newFile("zipArchive");
    +				try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) {
    +
    +					final ZipEntry zipEntry = new ZipEntry("cacheFile");
    +					zis.putNextEntry(zipEntry);
    +
    +					IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false);
    +				}
    +				return zipArchive;
    +			} else {
    +				throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey);
    +			}
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +
    +		}
    +	};
    +
    +	@Before
    +	public void setup() throws Exception {
    +		fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
    +	}
    +
    +	@After
    +	public void shutdown() {
    +		fileCache.shutdown();
    +	}
    +
    +	@Test
    +	public void testDirectoryDownloadedFromBlob() throws Exception {
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		assertTrue(fileStatus.isDir());
    +
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fs.exists(cacheFile));
    +		final String actualContent = FileUtils.readFileUtf8(new File(cacheFile.getPath()));
    +		assertEquals(testFileContent, actualContent);
    +	}
    +
    +	@Test
    +	public void testDirectoryCleanUp() throws Exception {
    +		fileCache.shutdown();
    +		DeleteCapturingDirectScheduledExecutorService scheduledExecutorService =
    +			new DeleteCapturingDirectScheduledExecutorService();
    +
    +		final int cleanupInterval = 1000;
    +		fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService,
    +			scheduledExecutorService, cleanupInterval);
    +
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
    +		ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID1);
    +		fileCache.createTmpFile(fileName, entry, jobID, attemptID2);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		fileCache.releaseJob(jobID, attemptID1);
    +		// still should be available
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		fileCache.releaseJob(jobID, attemptID2);
    --- End diff --
    
    are we at risk of race-conditions here between this call and the second `createTmpFile` call? We never wait for the returned future.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176662757
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---
    @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl
     	}
     
     	/**
    -	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper.
    +	 * Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper.
     	 */
     	@Test
     	public void testUploadJarFilesHelper() throws Exception {
     		uploadJarFile(getBlobServer(), getBlobClientConfig());
     	}
     
    +	@Test
    +	public void testDirectoryUploading() throws IOException {
    +		final File newFolder = temporaryFolder.newFolder();
    +		final File file1 = File.createTempFile("pre", "suff", newFolder);
    +		FileUtils.writeStringToFile(file1, "Test content");
    --- End diff --
    
    we have somewhere in a `FileUtils` a writeUtf8 method


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176769129
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -143,30 +160,23 @@ public void shutdown() {
     	/**
     	 * If the file doesn't exists locally, it will copy the file to the temp directory.
     	 *
    -	 * @param name  The name under which the file is registered.
     	 * @param entry The cache entry descriptor (path, executable flag)
     	 * @param jobID The ID of the job for which the file is copied.
     	 * @return The handle to the task that copies the file.
     	 */
    -	public Future<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
    +	public Future<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID, ExecutionAttemptID executionId) {
     		synchronized (lock) {
    -			Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
    -			if (jobEntries == null) {
    -				jobEntries = new HashMap<String, Tuple4<Integer, File, Path, Future<Path>>>();
    -				entries.put(jobID, jobEntries);
    -			}
    +			Map<String, Future<Path>> jobEntries = entries.computeIfAbsent(jobID, k -> new HashMap<>());
    +			final Set<ExecutionAttemptID> refHolders = jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
    +			refHolders.add(executionId);
     
     			// tuple is (ref-count, parent-temp-dir, cached-file-path, copy-process)
    --- End diff --
    
    outdated


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by ifndef-SleePy <gi...@git.apache.org>.
Github user ifndef-SleePy commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r170804963
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -314,6 +315,9 @@ public static TaskExecutor startTaskManager(
     
     		TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
     
    +		final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths(),
    --- End diff --
    
    It seems that the `fileCache` here is not been used at all.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179057166
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log
     		);
     	}
     
    +	public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
    +		checkNotNull(jobId);
    +
    +		synchronized (lock) {
    +			Set<ExecutionAttemptID> jobRefCounter = jobRefHolders.get(jobId);
    +
    +			if (jobRefCounter == null || jobRefCounter.isEmpty()) {
    +				LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId);
    +				return;
    +			}
    +
    +			jobRefCounter.remove(executionId);
    +			if (jobRefCounter.isEmpty()) {
    +				executorService.schedule(new DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS);
    +			}
    +		}
    +	}
    +
     	// ------------------------------------------------------------------------
     	//  background processes
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Asynchronous file copy process.
    +	 * Asynchronous file copy process from blob server.
     	 */
    -	private static class CopyProcess implements Callable<Path> {
    +	private static class CopyFromBlobProcess implements Callable<Path> {
     
    -		private final Path filePath;
    -		private final Path cachedPath;
    -		private boolean executable;
    +		private final PermanentBlobKey blobKey;
    +		private final Path target;
    +		private final boolean isDirectory;
    +		private final boolean isExecutable;
    +		private final JobID jobID;
    +		private final PermanentBlobService blobService;
     
    -		public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -			this.filePath = new Path(e.filePath);
    -			this.executable = e.isExecutable;
    -			this.cachedPath = cachedPath;
    +		CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) {
    +			try {
    +				this.isExecutable = e.isExecutable;
    +				this.isDirectory = e.isZipped;
    +				this.jobID = jobID;
    +				this.blobService = blobService;
    +				this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader());
    +				this.target = target;
    +			} catch (Exception ex) {
    +				throw new RuntimeException(ex);
    +			}
     		}
     
     		@Override
     		public Path call() throws IOException {
    -			// let exceptions propagate. we can retrieve them later from
    -			// the future and report them upon access to the result
    -			copy(filePath, cachedPath, this.executable);
    -			return cachedPath;
    +			final File file = blobService.getFile(jobID, blobKey);
    +
    +			if (isDirectory) {
    +				try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +					ZipEntry entry;
    +					while ((entry = zis.getNextEntry()) != null) {
    +						String fileName = entry.getName();
    +						Path newFile = new Path(target, fileName);
    +						if (entry.isDirectory()) {
    +							target.getFileSystem().mkdirs(newFile);
    +						} else {
    +							try (FSDataOutputStream fsDataOutputStream = target.getFileSystem()
    +									.create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
    +								IOUtils.copyBytes(zis, fsDataOutputStream, false);
    +							}
    +							//noinspection ResultOfMethodCallIgnored
    +							new File(newFile.getPath()).setExecutable(isExecutable);
    +						}
    +						zis.closeEntry();
    +					}
    +				}
    +				Files.delete(file.toPath());
    +				return target;
    +			} else {
    +				//noinspection ResultOfMethodCallIgnored
    +				file.setExecutable(isExecutable);
    +				return Path.fromLocalFile(file);
    +			}
    +
     		}
     	}
     
     	/**
     	 * If no task is using this file after 5 seconds, clear it.
     	 */
    -	private static class DeleteProcess implements Runnable {
    -
    -		private final Object lock;
    -		private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
    +	private class DeleteProcess implements Runnable {
     
    -		private final String name;
     		private final JobID jobID;
     
    -		public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
    -								String name, JobID jobID) {
    -			this.lock = lock;
    -			this.entries = entries;
    -			this.name = name;
    +		DeleteProcess(JobID jobID) {
     			this.jobID = jobID;
     		}
     
     		@Override
     		public void run() {
     			try {
     				synchronized (lock) {
    -					Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
     
    -					if (jobEntries != null) {
    -						Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
    +					if (jobRefHolders.get(jobID).isEmpty()) {
    +						// abort the copy
    +						for (Future<Path> fileFuture : entries.get(jobID).values()) {
    +							fileFuture.cancel(true);
    +						}
     
    -						if (entry != null) {
    -							int count = entry.f0;
    -							if (count > 1) {
    -								// multiple references still
    -								entry.f0 = count - 1;
    -							}
    -							else {
    -								// we remove the last reference
    -								jobEntries.remove(name);
    -								if (jobEntries.isEmpty()) {
    -									entries.remove(jobID);
    -								}
    -
    -								// abort the copy
    -								entry.f3.cancel(true);
    -
    -								// remove the file
    -								File file = new File(entry.f2.toString());
    -								if (file.exists()) {
    -									if (file.isDirectory()) {
    -										FileUtils.deleteDirectory(file);
    -									}
    -									else if (!file.delete()) {
    -										LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
    -									}
    -								}
    -
    -								// remove the job wide temp directory, if it is now empty
    -								File parent = entry.f1;
    -								if (parent.isDirectory()) {
    -									String[] children = parent.list();
    -									if (children == null || children.length == 0) {
    -										//noinspection ResultOfMethodCallIgnored
    -										parent.delete();
    -									}
    -								}
    -							}
    +						// remove the job wide temp directories
    +						for (File storageDirectory : storageDirectories) {
    +							File tempDir = new File(storageDirectory, jobID.toString());
    +							FileUtils.deleteDirectory(tempDir);
     						}
     					}
     				}
    -			}
    -			catch (IOException e) {
    +			} catch (IOException e) {
     				LOG.error("Could not delete file from local file cache.", e);
     			}
     		}
     	}
    +
    --- End diff --
    
    revert


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r171175263
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.filecache;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.cache.DistributedCache;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.PermanentBlobKey;
    +import org.apache.flink.runtime.blob.PermanentBlobService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
    +import org.apache.flink.shaded.guava18.com.google.common.io.Files;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.junit.runner.RunWith;
    +import org.mockito.Mock;
    +import org.mockito.runners.MockitoJUnitRunner;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.concurrent.Future;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests that {@link FileCache} can read files from {@link BlobServer}.
    + */
    +@RunWith(MockitoJUnitRunner.class)
    +public class FileCacheReadsFromBlobTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +		+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +		+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +		+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +		+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +		+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +		+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +		+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +		+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +		+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +		+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +		+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +		+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +		+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private FileCache fileCache;
    +
    +	@Mock
    +	private PermanentBlobService blobService;
    +
    +	@Before
    +	public void setup() throws IOException {
    +		try {
    +			String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()};
    +			fileCache = new FileCache(tmpDirectories, blobService);
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +			fail("Cannot create FileCache: " + e.getMessage());
    +		}
    +	}
    +
    +	@After
    +	public void shutdown() {
    +		try {
    +			fileCache.shutdown();
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +			fail("FileCache shutdown failed: " + e.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testFileDownloadedFromBlob() {
    +		try {
    +			JobID jobID = new JobID();
    +
    +			final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    +			when(blobService.getFile(jobID, permanentBlobKey)).thenAnswer(inv -> {
    +					File f = temporaryFolder.newFile("cacheFile");
    +					try {
    +						Files.write(testFileContent, f, Charsets.UTF_8);
    +					} catch (Exception e) {
    +						e.printStackTrace();
    +						fail("Error initializing the test: " + e.getMessage());
    +					}
    +					return f;
    +				}
    +			);
    +
    +			final String fileName = "test_file";
    +			// copy / create the file
    +			final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +				fileName,
    +				false,
    +				true,
    +				InstantiationUtil.serializeObject(permanentBlobKey));
    +			Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID);
    +
    +			final Path dstPath = copyResult.get();
    +			final String actualContent =
    +				StringUtils.join(Files.readLines(new File(dstPath.toUri()), StandardCharsets.UTF_8), "\n");
    +			assertTrue(dstPath.getFileSystem().exists(dstPath));
    +			assertEquals(testFileContent, actualContent);
    +
    +		} catch (Exception e) {
    --- End diff --
    
    just let the exception fail the test, i.e. no catching.


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    I am not suggesting dropping uploading directories, but rather gradually switch to blob server. Leave the possibility to distribute files via DFS (and leave Python using it) and only for 1.6.0 drop it completely.
    
    But of course I am open to suggestions.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176767248
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java ---
    @@ -91,6 +93,7 @@ public void after() throws Exception{
     	}
     
     	@Test
    +	@Category(Flip6.class)
    --- End diff --
    
    revert?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179071066
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java ---
    @@ -33,16 +38,16 @@
     
     import java.io.File;
     import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
     import java.util.concurrent.Future;
     
    -import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertTrue;
    -import static org.junit.Assert.fail;
     
     /**
    - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds.
    + * Tests that {@link FileCache} can read files from {@link BlobServer}.
      */
    -public class FileCacheDeleteValidationTest {
    +public class FileCacheReadsFromBlobTest {
    --- End diff --
    
    It is different in the way that this test uploads a single file. Therefore it follows a bit different path. The file is not zipped and is directly shipped from BlobCache. In case of directories there is unzipped version which lifecycle is managed by `FileCache`.
    
    If you think though the test `FileCacheDirectoryTest#testDirectoryDownloadedFromBlob` is enough, I will remove it.


---

[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5580
  
    @aljoscha Yes that's right. The latest commit does not work because Python API expects complete directories to be uploaded. I think it is quite important cause this way the uploaded Python code can be modularized and python libs can be used. Without the directories only a single file scripts could be executed. If I understood it correctly.
    
    Version from the commit a774fb664deabb6fdb437546038fc9477d291eb1 works, but Python API still uses DFS underneath DistributedCache.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176663553
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java ---
    @@ -57,89 +65,59 @@
     		+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
     		+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
     		+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    -		+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.\n";
    +		+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
     
     	@Rule
     	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
     
     	private FileCache fileCache;
    -	private File f;
     
    -	@Before
    -	public void setup() throws IOException {
    -		String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()};
    -		try {
    -			fileCache = new FileCache(tmpDirectories);
    -		}
    -		catch (Exception e) {
    -			e.printStackTrace();
    -			fail("Cannot create FileCache: " + e.getMessage());
    +	private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    +
    +	private final PermanentBlobService blobService = new PermanentBlobService() {
    +		@Override
    +		public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
    +			if (key.equals(permanentBlobKey)) {
    +				File f = temporaryFolder.newFile("cacheFile");
    +				FileUtils.writeFileUtf8(f, testFileContent);
    +				return f;
    +			} else {
    +				throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey);
    +			}
     		}
     
    -		f = temporaryFolder.newFile("cacheFile");
    -		try {
    -			Files.write(testFileContent, f, Charsets.UTF_8);
    -		}
    -		catch (Exception e) {
    -			e.printStackTrace();
    -			fail("Error initializing the test: " + e.getMessage());
    +		@Override
    +		public void close() throws IOException {
    +
     		}
    +	};
    +
    +	@Before
    +	public void setup() throws Exception {
    +		fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
     	}
     
     	@After
     	public void shutdown() {
    -		try {
    -			fileCache.shutdown();
    -		}
    -		catch (Exception e) {
    -			e.printStackTrace();
    -			fail("FileCache shutdown failed: " + e.getMessage());
    -		}
    +		fileCache.shutdown();
     	}
     
     	@Test
    -	public void testFileReuseForNextTask() {
    -		try {
    -			final JobID jobID = new JobID();
    -			final String fileName = "test_file";
    -
    -			final String filePath = f.toURI().toString();
    -
    -			// copy / create the file
    -			Future<Path> copyResult = fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID);
    -			copyResult.get();
    -
    -			// get another reference to the file
    -			Future<Path> copyResult2 = fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID);
    -
    -			// this should be available immediately
    -			assertTrue(copyResult2.isDone());
    -
    -			// delete the file
    -			fileCache.deleteTmpFile(fileName, jobID);
    -			// file should not yet be deleted
    -			assertTrue(fileCache.holdsStillReference(fileName, jobID));
    -
    -			// delete the second reference
    -			fileCache.deleteTmpFile(fileName, jobID);
    -			// file should still not be deleted, but remain for a bit
    -			assertTrue(fileCache.holdsStillReference(fileName, jobID));
    -
    -			fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID);
    -			fileCache.deleteTmpFile(fileName, jobID);
    -
    -			// after a while, the file should disappear
    -			long deadline = System.currentTimeMillis() + 20000;
    -			do {
    -				Thread.sleep(5500);
    -			}
    -			while (fileCache.holdsStillReference(fileName, jobID) && System.currentTimeMillis() < deadline);
    -
    -			assertFalse(fileCache.holdsStillReference(fileName, jobID));
    -		}
    -		catch (Exception e) {
    -			e.printStackTrace();
    -			fail(e.getMessage());
    -		}
    +	public void testFileDownloadedFromBlob() throws Exception {
    +		JobID jobID = new JobID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey));
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID);
    +
    +		final Path dstPath = copyResult.get();
    +		final String actualContent =
    +			StringUtils.join(Files.readLines(new File(dstPath.toUri()), StandardCharsets.UTF_8), "\n");
    --- End diff --
    
    use `Files.readLines(new File(dstPath.toUri()).collect(Collections.joining("\n")` instead?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179048742
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
    @@ -26,6 +26,7 @@
     import org.apache.flink.api.common.JobExecutionResult;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.api.common.Plan;
    +import org.apache.flink.api.common.cache.DistributedCache;
     import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
    --- End diff --
    
    this import should now be unnecessary. or the change should be reverted


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179056758
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log
     		);
     	}
     
    +	public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
    +		checkNotNull(jobId);
    +
    +		synchronized (lock) {
    +			Set<ExecutionAttemptID> jobRefCounter = jobRefHolders.get(jobId);
    +
    +			if (jobRefCounter == null || jobRefCounter.isEmpty()) {
    +				LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId);
    +				return;
    +			}
    +
    +			jobRefCounter.remove(executionId);
    --- End diff --
    
    shouldn't we also remove the entries? Otherwise the `entries` map will continue to grow until either the TM shuts down or the or crashes with an OOM error.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r185517079
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.filecache;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.cache.DistributedCache;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.blob.PermanentBlobKey;
    +import org.apache.flink.runtime.blob.PermanentBlobService;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.IOUtils;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.zip.ZipEntry;
    +import java.util.zip.ZipOutputStream;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after.
    + */
    +public class FileCacheDirectoriesTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +	+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +	+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +	+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +	+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +	+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +	+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +	+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +	+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +	+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +	+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +	+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +	+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +	+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private FileCache fileCache;
    +
    +	private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    +
    +	private final PermanentBlobService blobService = new PermanentBlobService() {
    +		@Override
    +		public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
    +			if (key.equals(permanentBlobKey)) {
    +				final File zipArchive = temporaryFolder.newFile("zipArchive");
    +				try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) {
    +
    +					final ZipEntry zipEntry = new ZipEntry("cacheFile");
    +					zis.putNextEntry(zipEntry);
    +
    +					IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false);
    +				}
    +				return zipArchive;
    +			} else {
    +				throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey);
    +			}
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +
    +		}
    +	};
    +
    +	@Before
    +	public void setup() throws Exception {
    +		fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
    +	}
    +
    +	@After
    +	public void shutdown() {
    +		fileCache.shutdown();
    +	}
    +
    +	@Test
    +	public void testDirectoryDownloadedFromBlob() throws Exception {
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		assertTrue(fileStatus.isDir());
    +
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fs.exists(cacheFile));
    +		final String actualContent = FileUtils.readFileUtf8(new File(cacheFile.getPath()));
    +		assertEquals(testFileContent, actualContent);
    +	}
    +
    +	@Test
    +	public void testDirectoryCleanUp() throws Exception {
    +		fileCache.shutdown();
    +		DeleteCapturingDirectScheduledExecutorService scheduledExecutorService =
    +			new DeleteCapturingDirectScheduledExecutorService();
    +
    +		final int cleanupInterval = 1000;
    +		fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService,
    +			scheduledExecutorService, cleanupInterval);
    +
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
    +		ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID1);
    +		fileCache.createTmpFile(fileName, entry, jobID, attemptID2);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		fileCache.releaseJob(jobID, attemptID1);
    +		// still should be available
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		fileCache.releaseJob(jobID, attemptID2);
    --- End diff --
    
    We are not, cause the `DirectExecutorService` is synchronous for all non-scheduled tasks. The only scheduled task is the `DeleteProcess` though, which we capture and run manually.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179060408
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.filecache;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.cache.DistributedCache;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.blob.PermanentBlobKey;
    +import org.apache.flink.runtime.blob.PermanentBlobService;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.IOUtils;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.concurrent.Future;
    +import java.util.zip.ZipEntry;
    +import java.util.zip.ZipOutputStream;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after.
    + */
    +public class FileCacheDirectoriesTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +	+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +	+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +	+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +	+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +	+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +	+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +	+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +	+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +	+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +	+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +	+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +	+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +	+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private FileCache fileCache;
    +
    +	private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    +
    +	private final PermanentBlobService blobService = new PermanentBlobService() {
    +		@Override
    +		public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
    +			if (key.equals(permanentBlobKey)) {
    +				final File zipArchive = temporaryFolder.newFile("zipArchive");
    +				try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) {
    +
    +					final ZipEntry zipEntry = new ZipEntry("cacheFile");
    +					zis.putNextEntry(zipEntry);
    +
    +					IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false);
    +				}
    +				return zipArchive;
    +			} else {
    +				throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey);
    +			}
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +
    +		}
    +	};
    +
    +	@Before
    +	public void setup() throws Exception {
    +		fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
    +	}
    +
    +	@After
    +	public void shutdown() {
    +		fileCache.shutdown();
    +	}
    +
    +	@Test
    +	public void testDirectoryDownloadedFromBlob() throws Exception {
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		assertTrue(fileStatus.isDir());
    +
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fs.exists(cacheFile));
    +		final String actualContent = FileUtils.readFileUtf8(new File(cacheFile.getPath()));
    +		assertEquals(testFileContent, actualContent);
    +	}
    +
    +	@Test
    +	public void testDirectoryCleanUp() throws Exception {
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
    +		ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID1);
    +		fileCache.createTmpFile(fileName, entry, jobID, attemptID2);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		fileCache.releaseJob(jobID, attemptID1);
    +		// still should be available
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		fileCache.releaseJob(jobID, attemptID2);
    +		// still should be available, file will be deleted after 5 seconds
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		// after a while, the file should disappear
    +		long deadline = System.currentTimeMillis() + 10000;
    +		do {
    +			Thread.sleep(5500);
    --- End diff --
    
    well, I'd rather not blow 5-10 seconds on a single test.
    
    I would suggest shutting down the FileCache instead. If the delete process was triggered it should still delete the file.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176659407
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -159,7 +162,7 @@ public ExecutionConfig getConfig() {
     	* Get the list of cached files that were registered for distribution among the task managers.
     	*/
     	public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
    -		return cacheFile;
    +		return cacheFile.entrySet().stream().map(e -> Tuple2.of(e.getKey(), e.getValue())).collect(Collectors.toList());
    --- End diff --
    
    if the mapping is never used there's no value in using a map in the first place.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176661496
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Asynchronous file copy process.
    -	 */
    -	private static class CopyProcess implements Callable<Path> {
    -
    -		private final Path filePath;
    -		private final Path cachedPath;
    -		private boolean executable;
    -
    -		public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -			this.filePath = new Path(e.filePath);
    -			this.executable = e.isExecutable;
    -			this.cachedPath = cachedPath;
    -		}
    -
    -		@Override
    -		public Path call() throws IOException {
    -			// let exceptions propagate. we can retrieve them later from
    -			// the future and report them upon access to the result
    -			copy(filePath, cachedPath, this.executable);
    -			return cachedPath;
    -		}
    -	}
    -
    -	/**
    -	 * If no task is using this file after 5 seconds, clear it.
    +	 * Asynchronous file copy process from blob server.
     	 */
    -	private static class DeleteProcess implements Runnable {
    +	private static class CopyFromBlobProcess implements Callable<Path> {
     
    -		private final Object lock;
    -		private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
    -
    -		private final String name;
    +		private final PermanentBlobKey blobKey;
    +		private final Path target;
    +		private final boolean directory;
    --- End diff --
    
    -> `isDirectory`


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r183700946
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
    @@ -488,13 +494,36 @@ public void addJar(Path jar) {
     		return userJars;
     	}
     
    +	/**
    +	 * Adds the path of a custom file required to run the job on a task manager.
    +	 *
    +	 * @param file
    --- End diff --
    
    missing parameter docs for `name`


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5580


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179056079
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log
     		);
     	}
     
    +	public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
    +		checkNotNull(jobId);
    +
    +		synchronized (lock) {
    +			Set<ExecutionAttemptID> jobRefCounter = jobRefHolders.get(jobId);
    +
    +			if (jobRefCounter == null || jobRefCounter.isEmpty()) {
    +				LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId);
    +				return;
    +			}
    +
    +			jobRefCounter.remove(executionId);
    +			if (jobRefCounter.isEmpty()) {
    +				executorService.schedule(new DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS);
    +			}
    +		}
    +	}
    +
     	// ------------------------------------------------------------------------
     	//  background processes
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Asynchronous file copy process.
    +	 * Asynchronous file copy process from blob server.
     	 */
    -	private static class CopyProcess implements Callable<Path> {
    +	private static class CopyFromBlobProcess implements Callable<Path> {
     
    -		private final Path filePath;
    -		private final Path cachedPath;
    -		private boolean executable;
    +		private final PermanentBlobKey blobKey;
    +		private final Path target;
    +		private final boolean isDirectory;
    +		private final boolean isExecutable;
    +		private final JobID jobID;
    +		private final PermanentBlobService blobService;
     
    -		public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -			this.filePath = new Path(e.filePath);
    -			this.executable = e.isExecutable;
    -			this.cachedPath = cachedPath;
    +		CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) {
    +			try {
    +				this.isExecutable = e.isExecutable;
    +				this.isDirectory = e.isZipped;
    +				this.jobID = jobID;
    +				this.blobService = blobService;
    +				this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader());
    +				this.target = target;
    +			} catch (Exception ex) {
    --- End diff --
    
    the constructor signature should be modified instead


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179050736
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java ---
    @@ -37,18 +39,31 @@
      */
     @Public
     public class DistributedCache {
    -	
    -	public static class DistributedCacheEntry {
    -		
    +
    +	public static class DistributedCacheEntry implements Serializable {
    +
     		public String filePath;
     		public Boolean isExecutable;
    -		
    -		public DistributedCacheEntry(String filePath, Boolean isExecutable){
    +		public Boolean isZipped;
    --- End diff --
    
    use a primitive instead?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179053531
  
    --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.tests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import org.apache.commons.lang3.StringUtils;
    +
    +import java.io.File;
    +import java.nio.file.Files;
    +import java.nio.file.Paths;
    +
    +import static java.util.Collections.singletonList;
    +
    +/**
    + * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through
    + * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read
    + * version read from cache, we delete the initial file.
    + */
    +public class DistributedCacheViaBlobTestProgram {
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		final ParameterTool params = ParameterTool.fromArgs(args);
    +
    +		final String fileContent = params.getRequired("content");
    +		final String tempDir = params.getRequired("tempDir");
    +
    +		final File tempFile = File.createTempFile("temp", null, new File(tempDir));
    +		Files.write(tempFile.toPath(), singletonList(fileContent));
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    --- End diff --
    
    explicitly set the parallelism to `1`?


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179054489
  
    --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.tests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import org.apache.commons.lang3.StringUtils;
    +
    +import java.io.File;
    +import java.nio.file.Files;
    +import java.nio.file.Paths;
    +
    +import static java.util.Collections.singletonList;
    +
    +/**
    + * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through
    + * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read
    + * version read from cache, we delete the initial file.
    + */
    +public class DistributedCacheViaBlobTestProgram {
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		final ParameterTool params = ParameterTool.fromArgs(args);
    +
    +		final String fileContent = params.getRequired("content");
    +		final String tempDir = params.getRequired("tempDir");
    --- End diff --
    
    I'm wondering whether we shouldn't just pass in a file instead.


---

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r171184258
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, String name) {
     	 * @param executable flag indicating whether the file should be executable
     	 */
     	public void registerCachedFile(String filePath, String name, boolean executable) {
    -		this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
    +		registerCachedFile(filePath, name, executable, false);
     	}
    +
    +	/**
    +	 * Registers a file at the distributed cache under the given name. The file will be accessible
    +	 * from any user-defined function in the (distributed) runtime under a local path. If upload is true files will
    +	 * be distributed via {@link BlobServer} otherwise Files should be local files (as long as all relevant workers
    +	 * have access to it), or files in a distributed file system. The runtime will copy the files temporarily to a
    +	 * local cache, if needed.
    +	 *
    +	 * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
    +	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
    +	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
    +	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
    +	 *
    +	 * @param filePath The path of the file
    +	 * @param name The name under which the file is registered.
    +	 * @param executable flag indicating whether the file should be executable
    +	 * @param upload flag indicating if the file should be distributed via BlobServer
    +	 */
    +	public void registerCachedFile(String filePath, String name, boolean executable, boolean upload) {
    --- End diff --
    
    send everything through the blob service


---