You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2017/07/18 13:49:37 UTC

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/4358

    [FLINK-7068][blob] change BlobService sub-classes for permanent and transient BLOBs

    This splits up the BLOB store classes into two different use cases which is the biggest change of FLIP-19:
    * a `PermanentBlobStore` should resemble use cases for BLOBs that are permanently stored for a job's life time (HA and non-HA).
    * a `TransientBlobStore` should reflect BLOB offloading for logs, RPC, etc. which even does not have to be reflected by files.
    
    The `BlobServer` implements both while the `BlobCache` provides access to individual implementations of the two. HA files may only be uploaded by the `BlobServer` (and the (now) only publicly useful method of `BlobClient` to upload jar files). All other accesses should go through the two BLOB store classes.
    
    This PR is based upon #4238 in a series to implement FLINK-6916. It contains some development history and should be squashed.
    
    - [X] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [X] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [X] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-6916-7068

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4358.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4358
    
----
commit 1f86efd013a81e84ba1556de0a04e4ac70229f79
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-22T15:31:17Z

    [FLINK-7053][blob] remove code duplication in BlobClientSslTest
    
    This lets BlobClientSslTest extend BlobClientTest as most of its implementation
    came from there and was simply copied.

commit ff083e850edb8f5f383f54f19519116e10308d61
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-23T09:40:34Z

    [FLINK-7053][blob] verify some of the buffers returned by GET

commit e138569019f5c75b70a21085e4829cb6cd1e93bc
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-23T10:04:10Z

    [FLINK-7053][blob] use TemporaryFolder for local BLOB dir in unit tests
    
    This replaces the use of some temporary directory where it is not guaranteed
    that it will be deleted after the test.

commit 70f06d9b38c5d8afb50b6c18b7b3a4e07e2bb3da
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-21T12:45:31Z

    [FLINK-7054][blob] remove LibraryCacheManager#getFile()
    
    This was only used in tests where it is avoidable but if used anywhere else, it
    may have caused cleanup issues.

commit 5463f65aeb3ed519b51ccc78abfcc003bf02b3f8
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-16T21:13:56Z

    [FLINK-7054][hotfix] fix a checkstyle error

commit 12052c0b8c8b869f9b6bd6f9d53c9ed6e362c631
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-21T14:14:15Z

    [FLINK-7055][blob] refactor getURL() to the more generic getFile()
    
    The fact that we always returned URL objects is a relic of the BlobServer's only
    use for URLClassLoader. Since we'd like to extend its use, returning File
    objects instead is more generic.

commit 9dd7ba652ff1b09fb5bf44e8b6b5b885a56873ab
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-21T16:04:43Z

    [FLINK-7056][blob] add API to allow job-related BLOBs to be stored

commit 90f779ba66728979977bd794e8f68b207471dbc2
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-23T17:17:07Z

    [FLINK-7056][blob] refactor the new API for job-related BLOBs
    
    For a cleaner API, instead of having a nullable jobId parameter, use two methods:
    one for job-related BLOBs, another for job-unrelated ones.

commit 6d1fda80c6f9869676030fd86177bbc7e981b1f9
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-16T21:22:12Z

    [FLINK-7056][hotfix] fix a checkstyle error

commit cdbb0fb32052c8163bc83df151a0c8354846e4dc
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-27T16:29:44Z

    [FLINK-7057][blob] move ref-counting from the LibraryCacheManager to the BlobCache
    
    Also change from BlobKey-based ref-counting to job-based ref-counting which is
    simpler and the mode we want to use from now on. Deferred cleanup (as before)
    is currently not implemented yet (TODO).
    At the BlobServer, no ref-counting will be used but the cleanup will happen
    when the job enters a final state (TODO).

commit 8617c9e8ffbdece8d02815ef71dc09f2d764453a
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-28T09:31:39Z

    [FLINK-7057][blob] change to a cleaner API for BlobService#registerJob()

commit 20cf9bf1fd2b382b6d45dc7b15c7be9bed7f5f16
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-28T12:09:11Z

    [FLINK-7057][blob] implement deferred cleanup at the BlobCache
    
    Whenever a job is not referenced at the BlobCache anymore, we set a TTL and let
    the cleanup task remove it when this is hit and the task is run. For now, this
    means that a BLOB will be retained at most
    (2 * ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) seconds after not
    being referenced anymore. We do this so that a recovery still has the chance to
    use existing files rather than to download them again.

commit 270f3957b6d5e9061d528158a86eb1837ad2786c
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-28T15:17:06Z

    [FLINK-7057][blob] integrate cleanup of job-related JARs from the BlobServer
    
    TODO: an integration test that verifies that this is actually done when desired
    and not performed when not, e.g. if the job did not reach a final execution
    state

commit 39300a32c90c82f8044cfd6d5fcaf1b83b250c87
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-30T12:52:19Z

    [FLINK-7057][tests] extract FailingBlockingInvokable from CoordinatorShutdownTest

commit 56113349799669326074d269d6c07bb19bd45788
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-30T12:56:14Z

    [FLINK-7057][blob] add an integration test for the BlobServer cleanup
    
    This ensures that BLOB files are actually deleted when a job enters a final
    state.

commit 93861f1f624c5b514fcdd041b592e0c40c66d103
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-03T09:25:29Z

    [FLINK-7057][tests] refrain from catching an exception just to fail the test
    
    removes code like this in the BLOB store unit tests:
    
    catch (Exception e) {
        e.printStackTrace();
        fail(e.getMessage());
    }

commit 80a05142a68f8a3d0aaf1d447ab87a1e506364a4
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-03T11:45:33Z

    [FLINK-7057][blob] fix BlobServer#cleanupJob() being too eager
    
    Instead of deleting the job's directory, it was deleting the parent storage
    directory.

commit de4ca2a7c676e7e1a82172ce680ae5a5f6693df4
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-03T15:03:50Z

    [FLINK-7057][blob] fix BlobServer cleanup integration
    
    * the test did not check the correct directories for cleanup
    * the test did not honour the test timeout

commit dc58fa5d5ffc299b493dc1a78f26aa1914036151
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-03T15:11:18Z

    [FLINK-7057][blob] test and fix BlobServer cleanup for a failed job submission

commit d1de87bfd34c5bb1f2dde0402bcceac6013c150b
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-04T07:55:29Z

    [FLINK-7057][blob] rework the LibraryCacheManager API
    
    Since ref-counting has moved to the BlobCache, the BlobLibraryCacheManager is
    just a thin wrapper to get a user class loader by retrieving BLOBs from the
    BlobCache/BlobServer. Therefore, move the job-registration/-release out of it,
    too, and restrict its use to the task manager where the BlobCache is used (on
    the BlobServer, jobs do not need registration since they are only used once and
    will be deleted when they enter a final state).
    
    This makes the BlobServer and BlobCache instances available at the JobManager
    and TaskManager instances, respectively, also enabling future use cases outside
    of the LibraryCacheManager.

commit b62f1f640e33126523eaaa7bd51f0b24962c67fd
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-04T09:50:07Z

    [hotfix] increase Scala checkstyle maxParameters to 20

commit 5e9a9a7cd779a8e0d28947bcfa64cc16c02857f2
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-04T09:51:28Z

    [FLINK-7057][blob] address PR comments

commit 5fa960676281fc1473105c849fbce9cb22cacacb
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-04T12:41:18Z

    [FLINK-7057][blob] fix JobManagerLeaderElectionTest

commit a6ca6812af7b4130e77775fd3896f3f44a357334
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-04T20:49:12Z

    [FLINK-7057][blob] re-introduce some ref-counting for BlobLibraryCacheManager
    
    Apparently, we do need to return the same ClassLoader for different (parallel)
    tasks of a job running on the same task manager. Therefore, keep the initial
    task registration implementation that was removed with
    8331fbb208d975e0c1ec990344c14315ea08dd4a and only adapt it here. This also
    restores some tests and adds new combinations not tested before.

commit da1be581529813ec359f1ac09a00bc564a3812dd
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-06T07:38:17Z

    [FLINK-7057][blob] address PR comments

commit b02807e75cea960fd2c89e49656be2e6f6e32394
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-10T11:40:00Z

    [FLINK-7057][tests] fix (manual/ignored) BlobCacheCleanupTest#testJobDeferredCleanup()

commit 0f9f7d7f90497708cd2589b34573f10be0b9d331
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-16T21:30:43Z

    [FLINK-7057][hotfix] fix a checkstyle error

commit 742488145b6e5ae94f29371915b2d5b08e4ff655
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-11T09:19:20Z

    [FLINK-7068][blob] start introducing a new BLOB storage abstraction
    
    This is incomplete and may not compile and/or run tests successfully yet.

commit de88cede820418628779957af76a9d49b535c26b
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-11T10:29:38Z

    [FLINK-7068][blob] remove BlobView from TransientBlobCache
    
    The transient BLOB cache is not supposed to work with the HA store since it only
    serves non-HA files.

commit 99edd8966e3032110be8d2e240440da13dd2f655
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-11T13:04:00Z

    [FLINK-7068][blob] remove unnecessary use of BlobClient

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137303136
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java ---
    @@ -18,61 +18,31 @@
     
     package org.apache.flink.runtime.blob;
     
    -import org.apache.flink.api.common.JobID;
    -
     import java.io.Closeable;
    -import java.io.File;
    -import java.io.IOException;
     
     /**
      * A simple store and retrieve binary large objects (BLOBs).
      */
     public interface BlobService extends Closeable {
     
     	/**
    -	 * Returns the path to a local copy of the (job-unrelated) file associated with the provided
    -	 * blob key.
    -	 *
    -	 * @param key blob key associated with the requested file
    -	 * @return The path to the file.
    -	 * @throws java.io.FileNotFoundException when the path does not exist;
    -	 * @throws IOException if any other error occurs when retrieving the file
    -	 */
    -	File getFile(BlobKey key) throws IOException;
    -
    -	/**
    -	 * Returns the path to a local copy of the file associated with the provided job ID and blob key.
    +	 * Returns a BLOB service for accessing permanent BLOBs.
     	 *
    -	 * @param jobId ID of the job this blob belongs to
    -	 * @param key blob key associated with the requested file
    -	 * @return The path to the file.
    -	 * @throws java.io.FileNotFoundException when the path does not exist;
    -	 * @throws IOException if any other error occurs when retrieving the file
    +	 * @return BLOB service
     	 */
    -	File getFile(JobID jobId, BlobKey key) throws IOException;
    +	PermanentBlobService getPermanentBlobStore();
    --- End diff --
    
    I'm not so sure about the naming here. The `BlobStore` is actually something else than the `BlobService`. Would be good if we use a consistent naming for the different things (this could also include renaming some of the entities).


---

[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4358
  
    Rebased and extended the PR as requested - the last two commits contain the changes compared to the last review.
    
    I tried to clean up some of the commits for a better merge but please note that this PR also includes #4568, #4238 and #4402 fixes and commits. For some of those, changes were applied afterwards in the review process which will cause conflicts in their respective PRs that I will close (fixed after merging this PR).


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137287126
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.api.common.JobID;
    +
    +import java.io.Closeable;
    +import java.io.File;
    +import java.io.IOException;
    +
    +/**
    + * A service to retrieve permanent binary large objects (BLOBs).
    + * <p>
    + * These include per-job BLOBs that are covered by high-availability (HA) mode, e.g. a job's JAR
    + * files, parts of an off-loaded {@link org.apache.flink.runtime.deployment.TaskDeploymentDescriptor}
    + * or files in the {@link org.apache.flink.api.common.cache.DistributedCache}.
    + */
    +public interface PermanentBlobService extends Closeable {
    +
    +	/**
    +	 * Returns the path to a local copy of the file associated with the provided job ID and blob
    +	 * key.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 * @param key
    +	 * 		BLOB key associated with the requested file
    +	 *
    +	 * @return The path to the file.
    +	 *
    +	 * @throws java.io.FileNotFoundException
    +	 * 		if the BLOB does not exist;
    +	 * @throws IOException
    +	 * 		if any other error occurs when retrieving the file
    +	 */
    +	File getHAFile(JobID jobId, BlobKey key) throws IOException;
    --- End diff --
    
    Not sure whether this is the right name because HA does not depend on the `PermanentBlobService` but on the the `BlobStore`. I would suggest to rename it.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137271569
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException {
     	 *
     	 * @param jobId
     	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    -	 * @param requiredBlob
    +	 * @param blobKey
     	 * 		blob key associated with the requested file
    +	 * @param highlyAvailable
    +	 * 		whether to the requested file is highly available (HA)
     	 *
     	 * @return file referring to the local storage location of the BLOB
     	 *
     	 * @throws IOException
     	 * 		Thrown if the file retrieval failed.
     	 */
    -	private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException {
    -		checkArgument(requiredBlob != null, "BLOB key cannot be null.");
    +	private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException {
    +		checkArgument(blobKey != null, "BLOB key cannot be null.");
     
    -		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob);
    +		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
    +		readWriteLock.readLock().lock();
     
    -		if (localFile.exists()) {
    +		try {
    +			getFileInternal(jobId, blobKey, highlyAvailable, localFile);
     			return localFile;
    +		} finally {
    +			readWriteLock.readLock().unlock();
     		}
    -		else {
    +	}
    +
    +	/**
    +	 * Helper to retrieve the local path of a file associated with a job and a blob key.
    +	 * <p>
    +	 * The blob server looks the blob key up in its local storage. If the file exists, it is
    +	 * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
    +	 * or a {@link FileNotFoundException} is thrown.
    +	 * <p>
    +	 * <strong>Assumes the read lock has already been acquired.</strong>
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    +	 * @param blobKey
    +	 * 		blob key associated with the requested file
    +	 * @param highlyAvailable
    +	 * 		whether to the requested file is highly available (HA)
    +	 * @param localFile
    +	 *      (local) file where the blob is/should be stored
    +	 *
    +	 * @throws IOException
    +	 * 		Thrown if the file retrieval failed.
    +	 */
    +	void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException {
    +		// assume readWriteLock.readLock() was already locked (cannot really check that)
    +
    +		if (localFile.exists()) {
    +			return;
    +		} else if (highlyAvailable) {
    +			// Try the HA blob store
    +			// first we have to release the read lock in order to acquire the write lock
    +			readWriteLock.readLock().unlock();
    +
    +			// use a temporary file (thread-safe without locking)
    +			File incomingFile = null;
     			try {
    -				// Try the blob store
    -				blobStore.get(jobId, requiredBlob, localFile);
    +				incomingFile = createTemporaryFilename();
    +				blobStore.get(jobId, blobKey, incomingFile);
    +
    +				BlobUtils.moveTempFileToStore(
    +					incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
    --- End diff --
    
    Not sure whether the `writeLock` should escape the scope of the BlobServer via `BlobUtils.moveTempFileStore`. I think it would be better to lock outside of the `moveTempFileToStore` method. This should also give a better separation of concerns.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r139969076
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java ---
    @@ -0,0 +1,322 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides access to transient BLOB files stored at the {@link BlobServer}.
    + *
    + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy
    + */
    +public class TransientBlobCache implements TransientBlobService {
    +
    +	/** The log object used for debugging. */
    +	private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class);
    +
    +	/** Counter to generate unique names for temporary files. */
    +	private final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	private final InetSocketAddress serverAddress;
    +
    +	/**
    +	 * Root directory for local file storage
    +	 */
    +	private final File storageDir;
    +
    +	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/** Shutdown hook thread to ensure deletion of the local storage directory. */
    +	private final Thread shutdownHook;
    +
    +	/** The number of retries when the transfer fails */
    +	private final int numFetchRetries;
    +
    +	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    +	private final Configuration blobClientConfig;
    +
    +	/** Lock guarding concurrent file accesses */
    +	private final ReadWriteLock readWriteLock;
    +
    +	/**
    +	 * Instantiates a new BLOB cache.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public TransientBlobCache(
    +			final InetSocketAddress serverAddress,
    +			final Configuration blobClientConfig) throws IOException {
    --- End diff --
    
    Yes, that's mostly why. And passing all parameters that the `BlobClient` requires is also not too nice, cumbersome and error-prone.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137259815
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
    @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
     		try {
     			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
     
    -			BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService);
    +			BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService);
    +		} finally {
    +			if (blobStoreService != null) {
    +				blobStoreService.closeAndCleanupAllData();
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are
    +	 * recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}.
    +	 */
    +	@Test
    +	public void testBlobServerCorruptedFile() throws Exception {
    +		org.apache.flink.configuration.Configuration
    +			config = new org.apache.flink.configuration.Configuration();
    +		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
    +		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
    +		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
    +			temporaryFolder.newFolder().getAbsolutePath());
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
    +
    +		BlobStoreService blobStoreService = null;
    +
    +		try {
    +			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
    +
    +			BlobServerCorruptionTest.testGetFailsFromCorruptFile(config, blobStoreService, exception);
    +		} finally {
    +			if (blobStoreService != null) {
    +				blobStoreService.closeAndCleanupAllData();
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
    +	 * participating BlobServer when uploaded via a {@link org.apache.flink.runtime.blob.BlobCache}.
    +	 */
    +	@Test
    +	public void testBlobCacheRecovery() throws Exception {
    +		org.apache.flink.configuration.Configuration
    +			config = new org.apache.flink.configuration.Configuration();
    +		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
    +		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
    --- End diff --
    
    Statebackend not defined.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r139956243
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -601,7 +564,39 @@ public void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOExceptio
     	}
     
     	/**
    -	 * Uploads the JAR files to a {@link BlobServer} at the given address.
    +	 * Reads the response from the input stream and throws in case of errors
    +	 *
    +	 * @param is
    +	 * 		stream to read from
    +	 *
    +	 * @return  <tt>true</tt> if the delete operation was successful at the {@link BlobServer};
    +	 *          <tt>false</tt> otherwise
    +	 *
    +	 * @throws IOException
    +	 * 		if the server code throws an exception or if reading the response failed
    +	 */
    +	private static boolean receiveAndCheckDeleteResponse(InputStream is) throws IOException {
    +		int response = is.read();
    +		if (response < 0) {
    +			throw new EOFException("Premature end of response");
    +		}
    +		if (response == RETURN_ERROR) {
    +			Throwable cause = readExceptionFromStream(is);
    +			if (cause == null) {
    +				return false;
    +			} else {
    +				throw new IOException("Server side error: " + cause.getMessage(), cause);
    --- End diff --
    
    I'm tempted to leave it that way not only because previous code followed that pattern (as in `BlobClient#receiveAndCheckGetResponse()`) but also because it is nice to have a brief failure message with some details without having to dig through the stack trace.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137262572
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -158,7 +151,7 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed rou
     			cache = blobPortFuture.thenApplyAsync(
     				(Integer port) -> {
     					try {
    -						return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
    +						return new TransientBlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config);
     					} catch (IOException e) {
     						throw new FlinkFutureException("Could not create BlobCache.", e);
    --- End diff --
    
    Maybe we could adapt the exception message to `TransientBlobCache`.


---

[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4358
  
    Rebased onto `master`, but had to drag in #4402 early to fix the end-to-end tests failing due to spurious warnings. The test failure you observed was actually a test instability introduced with #4238 for which I added a hotfix to this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142139191
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java ---
    @@ -142,7 +128,64 @@ public void testJobCleanup() throws IOException, InterruptedException {
     	}
     
     	/**
    -	 * Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}
    +	 * Tests that {@link BlobCache} sets the expected reference counts and cleanup timeouts when
    --- End diff --
    
    Rename `BlobCache` to `PermanentBlobCache`.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r139968617
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ---
    @@ -0,0 +1,429 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup.
    + * <p>
    + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to
    + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB,
    + * it will try to download it from a distributed HA file system (if available) or the BLOB server.
    + * <p>
    + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup.
    + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded.
    + */
    +public class PermanentBlobCache extends TimerTask implements PermanentBlobService {
    +
    +	/** The log object used for debugging. */
    +	private static final Logger LOG = LoggerFactory.getLogger(PermanentBlobCache.class);
    +
    +	/** Counter to generate unique names for temporary files. */
    +	private final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	private final InetSocketAddress serverAddress;
    +
    +	/** Root directory for local file storage */
    +	private final File storageDir;
    +
    +	/** Blob store for distributed file storage, e.g. in HA */
    +	private final BlobView blobView;
    +
    +	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/** Shutdown hook thread to ensure deletion of the storage directory. */
    +	private final Thread shutdownHook;
    +
    +	/** The number of retries when the transfer fails */
    +	private final int numFetchRetries;
    +
    +	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    +	private final Configuration blobClientConfig;
    +
    +	/** Lock guarding concurrent file accesses */
    +	private final ReadWriteLock readWriteLock;
    +
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Job reference counters with a time-to-live (TTL).
    +	 */
    +	@VisibleForTesting
    +	static class RefCount {
    +		/**
    +		 * Number of references to a job.
    +		 */
    +		public int references = 0;
    +
    +		/**
    +		 * Timestamp in milliseconds when any job data should be cleaned up (no cleanup for
    +		 * non-positive values).
    +		 */
    +		public long keepUntil = -1;
    +	}
    +
    +	/** Map to store the number of references to a specific job */
    +	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
    +
    +	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
    +	private final long cleanupInterval;
    +
    +	private final Timer cleanupTimer;
    +
    +	/**
    +	 * Instantiates a new cache for permanent BLOBs which are also available in an HA store.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 * @param blobView
    +	 * 		(distributed) HA blob store file system to retrieve files from first
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public PermanentBlobCache(
    +		final InetSocketAddress serverAddress,
    +		final Configuration blobClientConfig,
    +		final BlobView blobView) throws IOException {
    +
    +		this.serverAddress = checkNotNull(serverAddress);
    +		this.blobClientConfig = checkNotNull(blobClientConfig);
    +		this.blobView = checkNotNull(blobView, "blobStore");
    +		this.readWriteLock = new ReentrantReadWriteLock();
    +
    +		// configure and create the storage directory
    +		String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
    +		this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
    +		LOG.info("Created permanent BLOB cache storage directory " + storageDir);
    +
    +		// configure the number of fetch retries
    +		final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
    +		if (fetchRetries >= 0) {
    +			this.numFetchRetries = fetchRetries;
    +		} else {
    +			LOG.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.",
    +				BlobServerOptions.FETCH_RETRIES.key());
    +			this.numFetchRetries = 0;
    +		}
    +
    +		// Initializing the clean up task
    +		this.cleanupTimer = new Timer(true);
    +
    +		this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
    +		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
    +
    +		// Add shutdown hook to delete storage directory
    +		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +	}
    +
    +	/**
    +	 * Registers use of job-related BLOBs.
    +	 * <p>
    +	 * Using any other method to access BLOBs, e.g. {@link #getHAFile}, is only valid within calls
    +	 * to <tt>registerJob(JobID)</tt> and {@link #releaseJob(JobID)}.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #releaseJob(JobID)
    +	 */
    +	public void registerJob(JobID jobId) {
    +		synchronized (jobRefCounters) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +			if (ref == null) {
    +				ref = new RefCount();
    +				jobRefCounters.put(jobId, ref);
    +			} else {
    +				// reset cleanup timeout
    +				ref.keepUntil = -1;
    +			}
    +			++ref.references;
    +		}
    +	}
    +
    +	/**
    +	 * Unregisters use of job-related BLOBs and allow them to be released.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #registerJob(JobID)
    +	 */
    +	public void releaseJob(JobID jobId) {
    --- End diff --
    
    With the current code, it wouldn't have mattered, whether the counter is below zero since the cleanup logic inside `PermanentBlobCache#run()` accounted for that and `releaseJob` only set the cleanup time once. It should, however, also considered an improper use case and we may guard against it.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142126300
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +
    +import org.slf4j.Logger;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Closeable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Abstract base class for permanent and transient BLOB files.
    + */
    +public abstract class AbstractBlobCache implements Closeable {
    +
    +	/**
    +	 * The log object used for debugging.
    +	 */
    +	protected final Logger LOG;
    --- End diff --
    
    I think only static variables should be written in capital letters.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4358


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r128296027
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ---
    @@ -0,0 +1,391 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup.
    + * <p>
    + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to
    + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB,
    + * it will try to download it from a distributed HA file system (if available) or the BLOB server.
    + * <p>
    + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup.
    + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded.
    + */
    +public class PermanentBlobCache extends TimerTask implements PermanentBlobService {
    +
    +	/** The log object used for debugging. */
    +	private static final Logger LOG = LoggerFactory.getLogger(PermanentBlobCache.class);
    +
    +	/** Counter to generate unique names for temporary files. */
    +	private final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	private final InetSocketAddress serverAddress;
    +
    +	/** Root directory for local file storage */
    +	private final File storageDir;
    +
    +	/** Blob store for distributed file storage, e.g. in HA */
    +	private final BlobView blobView;
    +
    +	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/** Shutdown hook thread to ensure deletion of the storage directory. */
    +	private final Thread shutdownHook;
    +
    +	/** The number of retries when the transfer fails */
    +	private final int numFetchRetries;
    +
    +	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    +	private final Configuration blobClientConfig;
    +
    +	/** Lock guarding concurrent file accesses */
    +	private final ReadWriteLock readWriteLock;
    +
    +	// --------------------------------------------------------------------------------------------
    +
    +	/** The global lock to synchronize operations */
    +	private final Object lockObject = new Object();
    +
    +	private static class RefCount {
    +		public int references = 0;
    +		public long keepUntil = Long.MAX_VALUE;
    +	}
    +
    +	/** Map to store the number of references to a specific job */
    +	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
    +
    +	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
    +	private final long cleanupInterval;
    +
    +	private final Timer cleanupTimer;
    +
    +	/**
    +	 * Instantiates a new cache for permanent BLOBs which are also available in an HA store.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 * @param blobView
    +	 * 		(distributed) HA blob store file system to retrieve files from first
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public PermanentBlobCache(
    +		final InetSocketAddress serverAddress,
    +		final Configuration blobClientConfig,
    +		final BlobView blobView) throws IOException {
    +
    +		this.serverAddress = checkNotNull(serverAddress);
    +		this.blobClientConfig = checkNotNull(blobClientConfig);
    +		this.blobView = checkNotNull(blobView, "blobStore");
    +		this.readWriteLock = new ReentrantReadWriteLock();
    +
    +		// configure and create the storage directory
    +		String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
    +		this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
    +		LOG.info("Created permanent BLOB cache storage directory " + storageDir);
    +
    +		// configure the number of fetch retries
    +		final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
    +		if (fetchRetries >= 0) {
    +			this.numFetchRetries = fetchRetries;
    +		} else {
    +			LOG.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.",
    +				BlobServerOptions.FETCH_RETRIES.key());
    +			this.numFetchRetries = 0;
    +		}
    +
    +		// Initializing the clean up task
    +		this.cleanupTimer = new Timer(true);
    +
    +		this.cleanupInterval = blobClientConfig.getLong(
    +			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
    +			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
    +		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
    +
    +		// Add shutdown hook to delete storage directory
    +		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +	}
    +
    +	/**
    +	 * Registers use of job-related BLOBs.
    +	 * <p>
    +	 * Using any other method to access BLOBs, e.g. {@link #getHAFile}, is only valid within calls
    +	 * to <tt>registerJob(JobID)</tt> and {@link #releaseJob(JobID)}.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #releaseJob(JobID)
    +	 */
    +	public void registerJob(JobID jobId) {
    +		synchronized (lockObject) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +			if (ref == null) {
    +				ref = new RefCount();
    +				jobRefCounters.put(jobId, ref);
    +			}
    +			++ref.references;
    +		}
    +	}
    +
    +	/**
    +	 * Unregisters use of job-related BLOBs and allow them to be released.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #registerJob(JobID)
    +	 */
    +	public void releaseJob(JobID jobId) {
    +		synchronized (lockObject) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +
    +			if (ref == null) {
    +				LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls");
    +				return;
    +			}
    +
    +			--ref.references;
    +			if (ref.references == 0) {
    +				ref.keepUntil = System.currentTimeMillis() + cleanupInterval;
    +			}
    +		}
    +	}
    +
    +	public int getNumberOfReferenceHolders(JobID jobId) {
    +		synchronized (lockObject) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +			if (ref == null) {
    +				return 0;
    +			} else {
    +				return ref.references;
    +			}
    +		}
    +	}
    +
    +	public int getNumberOfCachedJobs() {
    +		return jobRefCounters.size();
    +	}
    +
    +	/**
    +	 * Returns the path to a local copy of the file associated with the provided job ID and blob
    +	 * key.
    +	 * <p>
    +	 * We will first attempt to serve the BLOB from the local storage. If the BLOB is not in
    +	 * there, we will try to download it from the HA store, or directly from the {@link BlobServer}.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 * @param key
    +	 * 		blob key associated with the requested file
    +	 *
    +	 * @return The path to the file.
    +	 *
    +	 * @throws java.io.FileNotFoundException
    +	 * 		if the BLOB does not exist;
    +	 * @throws IOException
    +	 * 		if any other error occurs when retrieving the file
    +	 */
    +	@Override
    +	public File getHAFile(JobID jobId, BlobKey key) throws IOException {
    +		checkNotNull(jobId);
    +		return getHAFileInternal(jobId, key);
    +	}
    +
    +	/**
    +	 * Returns local copy of the file for the BLOB with the given key.
    +	 * <p>
    +	 * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
    +	 * the cache, the method will try to download it from this cache's BLOB server.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    +	 * @param blobKey
    +	 * 		The key of the desired BLOB.
    +	 *
    +	 * @return file referring to the local storage location of the BLOB.
    +	 *
    +	 * @throws IOException
    +	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
    +	 */
    +	private File getHAFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
    +		checkArgument(blobKey != null, "BLOB key cannot be null.");
    +
    +		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
    +		readWriteLock.readLock().lock();
    +
    +		try {
    +			if (localFile.exists()) {
    +				return localFile;
    +			}
    +		} finally {
    +			readWriteLock.readLock().unlock();
    +		}
    +
    +		// first try the distributed blob store (if available)
    +		// use a temporary file (thread-safe without locking)
    +		File incomingFile = createTemporaryFilename();
    +		try {
    +			try {
    +				blobView.get(jobId, blobKey, incomingFile);
    +				BlobUtils.moveTempFileToStore(
    +					incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
    +
    +				return localFile;
    +			} catch (Exception e) {
    +				LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
    +			}
    +
    +			// fallback: download from the BlobServer
    +			BlobClient.downloadFromBlobServer(
    +				jobId, blobKey, true, incomingFile, serverAddress, blobClientConfig, numFetchRetries);
    +			BlobUtils.moveTempFileToStore(
    +				incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
    +
    +			return localFile;
    +		} finally {
    +			// delete incomingFile from a failed download
    +			if (!incomingFile.delete() && incomingFile.exists()) {
    +				LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",
    +					incomingFile, blobKey, jobId);
    +			}
    +		}
    +	}
    +
    +	public int getPort() {
    +		return serverAddress.getPort();
    +	}
    +
    +	/**
    +	 * Returns a file handle to the file associated with the given blob key on the blob
    +	 * server.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    +	 * @param key
    +	 * 		identifying the file
    +	 *
    +	 * @return file handle to the file
    +	 *
    +	 * @throws IOException
    +	 * 		if creating the directory fails
    +	 */
    +	@VisibleForTesting
    +	public File getStorageLocation(JobID jobId, BlobKey key) throws IOException {
    +		checkNotNull(jobId);
    +		return BlobUtils.getStorageLocation(storageDir, jobId, key);
    +	}
    +
    +	/**
    +	 * Returns a temporary file inside the BLOB server's incoming directory.
    +	 *
    +	 * @return a temporary file inside the BLOB server's incoming directory
    +	 *
    +	 * @throws IOException
    +	 * 		if creating the directory fails
    +	 */
    +	File createTemporaryFilename() throws IOException {
    +		return new File(BlobUtils.getIncomingDirectory(storageDir),
    +			String.format("temp-%08d", tempFileCounter.getAndIncrement()));
    +	}
    +
    +	/**
    +	 * Cleans up BLOBs which are not referenced anymore.
    +	 */
    +	@Override
    +	public void run() {
    +		synchronized (lockObject) {
    +			Iterator<Map.Entry<JobID, RefCount>> entryIter = jobRefCounters.entrySet().iterator();
    +
    +			while (entryIter.hasNext()) {
    +				Map.Entry<JobID, RefCount> entry = entryIter.next();
    +				RefCount ref = entry.getValue();
    +
    +				if (ref.references <= 0 && System.currentTimeMillis() >= ref.keepUntil) {
    +					JobID jobId = entry.getKey();
    +
    +					final File localFile =
    +						new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId));
    +					try {
    +						FileUtils.deleteDirectory(localFile);
    --- End diff --
    
    note to self: this should also be guarded by the `readWriteLock.writeLock()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r139962735
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException {
     	 *
     	 * @param jobId
     	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    -	 * @param requiredBlob
    +	 * @param blobKey
     	 * 		blob key associated with the requested file
    +	 * @param highlyAvailable
    +	 * 		whether to the requested file is highly available (HA)
     	 *
     	 * @return file referring to the local storage location of the BLOB
     	 *
     	 * @throws IOException
     	 * 		Thrown if the file retrieval failed.
     	 */
    -	private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException {
    -		checkArgument(requiredBlob != null, "BLOB key cannot be null.");
    +	private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException {
    +		checkArgument(blobKey != null, "BLOB key cannot be null.");
     
    -		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob);
    +		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
    +		readWriteLock.readLock().lock();
     
    -		if (localFile.exists()) {
    +		try {
    +			getFileInternal(jobId, blobKey, highlyAvailable, localFile);
     			return localFile;
    +		} finally {
    +			readWriteLock.readLock().unlock();
     		}
    -		else {
    +	}
    +
    +	/**
    +	 * Helper to retrieve the local path of a file associated with a job and a blob key.
    +	 * <p>
    +	 * The blob server looks the blob key up in its local storage. If the file exists, it is
    +	 * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
    +	 * or a {@link FileNotFoundException} is thrown.
    +	 * <p>
    +	 * <strong>Assumes the read lock has already been acquired.</strong>
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    +	 * @param blobKey
    +	 * 		blob key associated with the requested file
    +	 * @param highlyAvailable
    +	 * 		whether to the requested file is highly available (HA)
    +	 * @param localFile
    +	 *      (local) file where the blob is/should be stored
    +	 *
    +	 * @throws IOException
    +	 * 		Thrown if the file retrieval failed.
    +	 */
    +	void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException {
    +		// assume readWriteLock.readLock() was already locked (cannot really check that)
    +
    +		if (localFile.exists()) {
    +			return;
    +		} else if (highlyAvailable) {
    +			// Try the HA blob store
    +			// first we have to release the read lock in order to acquire the write lock
    +			readWriteLock.readLock().unlock();
    +
    +			// use a temporary file (thread-safe without locking)
    +			File incomingFile = null;
     			try {
    -				// Try the blob store
    -				blobStore.get(jobId, requiredBlob, localFile);
    +				incomingFile = createTemporaryFilename();
    +				blobStore.get(jobId, blobKey, incomingFile);
    +
    +				BlobUtils.moveTempFileToStore(
    +					incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
    --- End diff --
    
    Agreed - it's not too nice. The cost of changing that would be another try-catch block duplication at every use of this method though, so all usages would have this instead:
    ```
    writeLock.lock();
    try {
    	BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, storageFile, LOG, blobStore);
    } finally {
    	writeLock.unlock();
    }
    ```
    
    `BlobUtils` on the other hand is already quiet tightly coupled with the `BlobServer`/`Cache` classes anyway with mostly package-private methods for only those ... not quite sure what would be better and went this way.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142134172
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -375,7 +378,32 @@ public File getFile(BlobKey key) throws IOException {
     	 * 		Thrown if the file retrieval failed.
     	 */
     	@Override
    -	public File getFile(JobID jobId, BlobKey key) throws IOException {
    +	public File getTransientFile(JobID jobId, BlobKey key) throws IOException {
    +		checkNotNull(jobId);
    --- End diff --
    
    What about making the usage of blob keys failsafe by having different types of `BlobKey`. E.g. `TransientBlobKey` and `PermanentBlobKey`?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r139957010
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws IOException {
     		// start the server thread
     		setName("BLOB Server listener at " + getPort());
     		setDaemon(true);
    -		start();
    --- End diff --
    
    Basically yes but this pattern does not work well with classes inheriting from `BlobServer`. Here, this is only the case for the `TestingFailingBlobServer` which sets additional properties and relies on them being set while executing `run()`. If started in the constructor, the `run()` method would work on a partially initialised object.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142128462
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
    @@ -18,89 +18,21 @@
     
     package org.apache.flink.runtime.blob;
     
    -import org.apache.flink.annotation.VisibleForTesting;
    -import org.apache.flink.api.common.JobID;
    -import org.apache.flink.configuration.BlobServerOptions;
     import org.apache.flink.configuration.Configuration;
    -import org.apache.flink.util.FileUtils;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
     
    -import javax.annotation.Nullable;
    -import java.io.File;
    -import java.io.FileOutputStream;
     import java.io.IOException;
    -import java.io.InputStream;
    -import java.io.OutputStream;
     import java.net.InetSocketAddress;
    -import java.util.HashMap;
    -import java.util.Iterator;
    -import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -
    -import static org.apache.flink.util.Preconditions.checkArgument;
    -import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * The BLOB cache implements a local cache for content-addressable BLOBs.
    - *
    - * <p>When requesting BLOBs through the {@link BlobCache#getFile} methods, the
    - * BLOB cache will first attempt to serve the file from its local cache. Only if
    - * the local cache does not contain the desired BLOB, the BLOB cache will try to
    - * download it from a distributed file system (if available) or the BLOB
    - * server.</p>
    + * The BLOB cache provides access to BLOB services for permanent and transient BLOBs.
      */
    -public class BlobCache extends TimerTask implements BlobService {
    -
    -	/** The log object used for debugging. */
    -	private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class);
    -
    -	private final InetSocketAddress serverAddress;
    -
    -	/** Root directory for local file storage */
    -	private final File storageDir;
    -
    -	/** Blob store for distributed file storage, e.g. in HA */
    -	private final BlobView blobView;
    -
    -	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    -
    -	/** Shutdown hook thread to ensure deletion of the storage directory. */
    -	private final Thread shutdownHook;
    -
    -	/** The number of retries when the transfer fails */
    -	private final int numFetchRetries;
    -
    -	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    -	private final Configuration blobClientConfig;
    -
    -	// --------------------------------------------------------------------------------------------
    -
    -	/**
    -	 * Job reference counters with a time-to-live (TTL).
    -	 */
    -	private static class RefCount {
    -		/**
    -		 * Number of references to a job.
    -		 */
    -		public int references = 0;
    -		
    -		/**
    -		 * Timestamp in milliseconds when any job data should be cleaned up (no cleanup for
    -		 * non-positive values).
    -		 */
    -		public long keepUntil = -1;
    -	}
    -
    -	/** Map to store the number of references to a specific job */
    -	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
    +public class BlobCache implements BlobService {
     
    -	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
    -	private final long cleanupInterval;
    +	/** Caching store for permanent BLOBs. */
    +	private final PermanentBlobCache permanentBlobStore;
     
    -	private final Timer cleanupTimer;
    +	/** Store for transient BLOB files. */
    +	private final TransientBlobCache transientBlobStore;
    --- End diff --
    
    Same here.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142125479
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -158,9 +151,9 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed rou
     			cache = blobPortFuture.thenApplyAsync(
     				(Integer port) -> {
     					try {
    -						return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
    +						return new TransientBlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config);
     					} catch (IOException e) {
    -						throw new FlinkFutureException("Could not create BlobCache.", e);
    +						throw new FlinkFutureException("Could not create TransientBlobCache.", e);
    --- End diff --
    
    When rebasing, then this will be a `java.util.concurrent.CompletionException`.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142129292
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -120,6 +125,76 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) t
     		}
     	}
     
    +	/**
    +	 * Downloads the given BLOB from the given server and stores its contents to a (local) file.
    +	 *
    +	 * <p>Transient BLOB files are deleted after a successful copy of the server's data into the
    +	 * given <tt>localJarFile</tt>.
    +	 *
    +	 * @param jobId
    +	 * 		job ID the BLOB belongs to or <tt>null</tt> if job-unrelated
    +	 * @param blobKey
    +	 * 		BLOB key
    +	 * @param localJarFile
    +	 * 		the local file to write to
    +	 * @param serverAddress
    +	 * 		address of the server to download from
    +	 * @param blobClientConfig
    +	 * 		client configuration for the connection
    +	 * @param numFetchRetries
    +	 * 		number of retries before failing
    +	 *
    +	 * @throws IOException
    +	 * 		if an I/O error occurs during the download
    +	 */
    +	static void downloadFromBlobServer(
    +			@Nullable JobID jobId, BlobKey blobKey, File localJarFile,
    +			InetSocketAddress serverAddress, Configuration blobClientConfig, int numFetchRetries)
    --- End diff --
    
    Wrapping the parameters could be one per line.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137289123
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ---
    @@ -0,0 +1,429 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup.
    + * <p>
    + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to
    + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB,
    + * it will try to download it from a distributed HA file system (if available) or the BLOB server.
    + * <p>
    + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup.
    + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded.
    + */
    +public class PermanentBlobCache extends TimerTask implements PermanentBlobService {
    +
    +	/** The log object used for debugging. */
    +	private static final Logger LOG = LoggerFactory.getLogger(PermanentBlobCache.class);
    +
    +	/** Counter to generate unique names for temporary files. */
    +	private final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	private final InetSocketAddress serverAddress;
    +
    +	/** Root directory for local file storage */
    +	private final File storageDir;
    +
    +	/** Blob store for distributed file storage, e.g. in HA */
    +	private final BlobView blobView;
    +
    +	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/** Shutdown hook thread to ensure deletion of the storage directory. */
    +	private final Thread shutdownHook;
    +
    +	/** The number of retries when the transfer fails */
    +	private final int numFetchRetries;
    +
    +	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    +	private final Configuration blobClientConfig;
    +
    +	/** Lock guarding concurrent file accesses */
    +	private final ReadWriteLock readWriteLock;
    +
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Job reference counters with a time-to-live (TTL).
    +	 */
    +	@VisibleForTesting
    +	static class RefCount {
    +		/**
    +		 * Number of references to a job.
    +		 */
    +		public int references = 0;
    +
    +		/**
    +		 * Timestamp in milliseconds when any job data should be cleaned up (no cleanup for
    +		 * non-positive values).
    +		 */
    +		public long keepUntil = -1;
    +	}
    +
    +	/** Map to store the number of references to a specific job */
    +	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
    +
    +	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
    +	private final long cleanupInterval;
    +
    +	private final Timer cleanupTimer;
    +
    +	/**
    +	 * Instantiates a new cache for permanent BLOBs which are also available in an HA store.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 * @param blobView
    +	 * 		(distributed) HA blob store file system to retrieve files from first
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public PermanentBlobCache(
    +		final InetSocketAddress serverAddress,
    +		final Configuration blobClientConfig,
    +		final BlobView blobView) throws IOException {
    +
    +		this.serverAddress = checkNotNull(serverAddress);
    +		this.blobClientConfig = checkNotNull(blobClientConfig);
    +		this.blobView = checkNotNull(blobView, "blobStore");
    +		this.readWriteLock = new ReentrantReadWriteLock();
    +
    +		// configure and create the storage directory
    +		String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
    +		this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
    +		LOG.info("Created permanent BLOB cache storage directory " + storageDir);
    +
    +		// configure the number of fetch retries
    +		final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
    +		if (fetchRetries >= 0) {
    +			this.numFetchRetries = fetchRetries;
    +		} else {
    +			LOG.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.",
    +				BlobServerOptions.FETCH_RETRIES.key());
    +			this.numFetchRetries = 0;
    +		}
    +
    +		// Initializing the clean up task
    +		this.cleanupTimer = new Timer(true);
    +
    +		this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
    +		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
    +
    +		// Add shutdown hook to delete storage directory
    +		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +	}
    +
    +	/**
    +	 * Registers use of job-related BLOBs.
    +	 * <p>
    +	 * Using any other method to access BLOBs, e.g. {@link #getHAFile}, is only valid within calls
    +	 * to <tt>registerJob(JobID)</tt> and {@link #releaseJob(JobID)}.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #releaseJob(JobID)
    +	 */
    +	public void registerJob(JobID jobId) {
    +		synchronized (jobRefCounters) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +			if (ref == null) {
    +				ref = new RefCount();
    +				jobRefCounters.put(jobId, ref);
    +			} else {
    +				// reset cleanup timeout
    +				ref.keepUntil = -1;
    +			}
    +			++ref.references;
    +		}
    +	}
    +
    +	/**
    +	 * Unregisters use of job-related BLOBs and allow them to be released.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #registerJob(JobID)
    +	 */
    +	public void releaseJob(JobID jobId) {
    --- End diff --
    
    What happens if I call `releaseJob` multiple times for the same `jobId`? Couldn't it happen that the reference counter becomes negative?


---

[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4358
  
    I've merged #4238. Could you please rebase onto the latest master? Moreover, I've noticed that `JobManagerCleanupITCase.testBlobServerCleanupCancelledJob` failed in one of the Travis runs. Not sure if this is related.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137295883
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ---
    @@ -0,0 +1,429 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup.
    + * <p>
    + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to
    + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB,
    + * it will try to download it from a distributed HA file system (if available) or the BLOB server.
    + * <p>
    + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup.
    + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded.
    + */
    +public class PermanentBlobCache extends TimerTask implements PermanentBlobService {
    --- End diff --
    
    Just a wild thought: I noticed that the `TransientBlobCache` and the `PermanentBlobCache` have a lot of code in common. In order to reduce code duplication couldn't we create a common base class or let `PermanentBlobCache` extend `TransientBlobCache` adding the ref counting?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r141406439
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ---
    @@ -0,0 +1,429 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup.
    + * <p>
    + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to
    + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB,
    + * it will try to download it from a distributed HA file system (if available) or the BLOB server.
    + * <p>
    + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup.
    + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded.
    + */
    +public class PermanentBlobCache extends TimerTask implements PermanentBlobService {
    +
    +	/** The log object used for debugging. */
    +	private static final Logger LOG = LoggerFactory.getLogger(PermanentBlobCache.class);
    +
    +	/** Counter to generate unique names for temporary files. */
    +	private final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	private final InetSocketAddress serverAddress;
    +
    +	/** Root directory for local file storage */
    +	private final File storageDir;
    +
    +	/** Blob store for distributed file storage, e.g. in HA */
    +	private final BlobView blobView;
    +
    +	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/** Shutdown hook thread to ensure deletion of the storage directory. */
    +	private final Thread shutdownHook;
    +
    +	/** The number of retries when the transfer fails */
    +	private final int numFetchRetries;
    +
    +	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    +	private final Configuration blobClientConfig;
    +
    +	/** Lock guarding concurrent file accesses */
    +	private final ReadWriteLock readWriteLock;
    +
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Job reference counters with a time-to-live (TTL).
    +	 */
    +	@VisibleForTesting
    +	static class RefCount {
    +		/**
    +		 * Number of references to a job.
    +		 */
    +		public int references = 0;
    +
    +		/**
    +		 * Timestamp in milliseconds when any job data should be cleaned up (no cleanup for
    +		 * non-positive values).
    +		 */
    +		public long keepUntil = -1;
    +	}
    +
    +	/** Map to store the number of references to a specific job */
    +	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
    +
    +	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
    +	private final long cleanupInterval;
    +
    +	private final Timer cleanupTimer;
    +
    +	/**
    +	 * Instantiates a new cache for permanent BLOBs which are also available in an HA store.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 * @param blobView
    +	 * 		(distributed) HA blob store file system to retrieve files from first
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public PermanentBlobCache(
    +		final InetSocketAddress serverAddress,
    +		final Configuration blobClientConfig,
    +		final BlobView blobView) throws IOException {
    +
    +		this.serverAddress = checkNotNull(serverAddress);
    +		this.blobClientConfig = checkNotNull(blobClientConfig);
    +		this.blobView = checkNotNull(blobView, "blobStore");
    +		this.readWriteLock = new ReentrantReadWriteLock();
    +
    +		// configure and create the storage directory
    +		String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
    +		this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
    +		LOG.info("Created permanent BLOB cache storage directory " + storageDir);
    +
    +		// configure the number of fetch retries
    +		final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
    +		if (fetchRetries >= 0) {
    +			this.numFetchRetries = fetchRetries;
    +		} else {
    +			LOG.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.",
    +				BlobServerOptions.FETCH_RETRIES.key());
    +			this.numFetchRetries = 0;
    +		}
    +
    +		// Initializing the clean up task
    +		this.cleanupTimer = new Timer(true);
    +
    +		this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
    +		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
    +
    +		// Add shutdown hook to delete storage directory
    +		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +	}
    +
    +	/**
    +	 * Registers use of job-related BLOBs.
    +	 * <p>
    +	 * Using any other method to access BLOBs, e.g. {@link #getHAFile}, is only valid within calls
    +	 * to <tt>registerJob(JobID)</tt> and {@link #releaseJob(JobID)}.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #releaseJob(JobID)
    +	 */
    +	public void registerJob(JobID jobId) {
    +		synchronized (jobRefCounters) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +			if (ref == null) {
    +				ref = new RefCount();
    +				jobRefCounters.put(jobId, ref);
    +			} else {
    +				// reset cleanup timeout
    +				ref.keepUntil = -1;
    +			}
    +			++ref.references;
    +		}
    +	}
    +
    +	/**
    +	 * Unregisters use of job-related BLOBs and allow them to be released.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #registerJob(JobID)
    +	 */
    +	public void releaseJob(JobID jobId) {
    --- End diff --
    
    I was just wondering, because what happens if it is negative and then this job is registered again? Will the counter then be at `<= 0`? This would mean that all blobs associated with this job will be deleted at the next clean up cycle, right?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r141570317
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ---
    @@ -0,0 +1,429 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup.
    + * <p>
    + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to
    + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB,
    + * it will try to download it from a distributed HA file system (if available) or the BLOB server.
    + * <p>
    + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup.
    + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded.
    + */
    +public class PermanentBlobCache extends TimerTask implements PermanentBlobService {
    +
    +	/** The log object used for debugging. */
    +	private static final Logger LOG = LoggerFactory.getLogger(PermanentBlobCache.class);
    +
    +	/** Counter to generate unique names for temporary files. */
    +	private final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	private final InetSocketAddress serverAddress;
    +
    +	/** Root directory for local file storage */
    +	private final File storageDir;
    +
    +	/** Blob store for distributed file storage, e.g. in HA */
    +	private final BlobView blobView;
    +
    +	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/** Shutdown hook thread to ensure deletion of the storage directory. */
    +	private final Thread shutdownHook;
    +
    +	/** The number of retries when the transfer fails */
    +	private final int numFetchRetries;
    +
    +	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    +	private final Configuration blobClientConfig;
    +
    +	/** Lock guarding concurrent file accesses */
    +	private final ReadWriteLock readWriteLock;
    +
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Job reference counters with a time-to-live (TTL).
    +	 */
    +	@VisibleForTesting
    +	static class RefCount {
    +		/**
    +		 * Number of references to a job.
    +		 */
    +		public int references = 0;
    +
    +		/**
    +		 * Timestamp in milliseconds when any job data should be cleaned up (no cleanup for
    +		 * non-positive values).
    +		 */
    +		public long keepUntil = -1;
    +	}
    +
    +	/** Map to store the number of references to a specific job */
    +	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
    +
    +	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
    +	private final long cleanupInterval;
    +
    +	private final Timer cleanupTimer;
    +
    +	/**
    +	 * Instantiates a new cache for permanent BLOBs which are also available in an HA store.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 * @param blobView
    +	 * 		(distributed) HA blob store file system to retrieve files from first
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public PermanentBlobCache(
    +		final InetSocketAddress serverAddress,
    +		final Configuration blobClientConfig,
    +		final BlobView blobView) throws IOException {
    +
    +		this.serverAddress = checkNotNull(serverAddress);
    +		this.blobClientConfig = checkNotNull(blobClientConfig);
    +		this.blobView = checkNotNull(blobView, "blobStore");
    +		this.readWriteLock = new ReentrantReadWriteLock();
    +
    +		// configure and create the storage directory
    +		String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
    +		this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
    +		LOG.info("Created permanent BLOB cache storage directory " + storageDir);
    +
    +		// configure the number of fetch retries
    +		final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
    +		if (fetchRetries >= 0) {
    +			this.numFetchRetries = fetchRetries;
    +		} else {
    +			LOG.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.",
    +				BlobServerOptions.FETCH_RETRIES.key());
    +			this.numFetchRetries = 0;
    +		}
    +
    +		// Initializing the clean up task
    +		this.cleanupTimer = new Timer(true);
    +
    +		this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
    +		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
    +
    +		// Add shutdown hook to delete storage directory
    +		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +	}
    +
    +	/**
    +	 * Registers use of job-related BLOBs.
    +	 * <p>
    +	 * Using any other method to access BLOBs, e.g. {@link #getHAFile}, is only valid within calls
    +	 * to <tt>registerJob(JobID)</tt> and {@link #releaseJob(JobID)}.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #releaseJob(JobID)
    +	 */
    +	public void registerJob(JobID jobId) {
    +		synchronized (jobRefCounters) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +			if (ref == null) {
    +				ref = new RefCount();
    +				jobRefCounters.put(jobId, ref);
    +			} else {
    +				// reset cleanup timeout
    +				ref.keepUntil = -1;
    +			}
    +			++ref.references;
    +		}
    +	}
    +
    +	/**
    +	 * Unregisters use of job-related BLOBs and allow them to be released.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #registerJob(JobID)
    +	 */
    +	public void releaseJob(JobID jobId) {
    --- End diff --
    
    This behaviour changed with the last commits - the reference counter can now not go below 0 and issues a warning in case it would.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137266698
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOExcepti
     	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
     	 * @param blobKey
     	 * 		blob key associated with the requested file
    +	 * @param permanentBlob
    +	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
     	 *
     	 * @throws IOException
     	 *         thrown if an I/O error occurs while writing the header data to the output stream
     	 */
    -	private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
    +	private static void sendGetHeader(
    +			OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob)
    +			throws IOException {
     		checkNotNull(blobKey);
    +		checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related");
     
     		// Signal type of operation
     		outputStream.write(GET_OPERATION);
     
     		// Send job ID and key
     		if (jobId == null) {
     			outputStream.write(CONTENT_NO_JOB);
    +		} else if (permanentBlob) {
    +			outputStream.write(CONTENT_FOR_JOB_HA);
    +			outputStream.write(jobId.getBytes());
     		} else {
     			outputStream.write(CONTENT_FOR_JOB);
    --- End diff --
    
    Same here to `TRANSIENT_JOB_CONTENT`?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137259798
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
    @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
     		try {
     			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
     
    -			BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService);
    +			BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService);
    +		} finally {
    +			if (blobStoreService != null) {
    +				blobStoreService.closeAndCleanupAllData();
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are
    +	 * recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}.
    +	 */
    +	@Test
    +	public void testBlobServerCorruptedFile() throws Exception {
    +		org.apache.flink.configuration.Configuration
    +			config = new org.apache.flink.configuration.Configuration();
    +		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
    +		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
    +		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
    +			temporaryFolder.newFolder().getAbsolutePath());
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
    +
    +		BlobStoreService blobStoreService = null;
    +
    +		try {
    +			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
    --- End diff --
    
    Why not creating the `blobStoreService` outside of the try-finally block. Then you don't have to make the null check in the finally block.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r139955371
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -415,13 +393,17 @@ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, int offset, int l
     	 * 		the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
     	 * @param inputStream
     	 * 		the input stream to read the data from
    +	 * @param permanentBlob
    +	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
     	 *
     	 * @return the computed BLOB key of the uploaded BLOB
     	 *
     	 * @throws IOException
     	 * 		thrown if an I/O error occurs while uploading the data to the BLOB server
     	 */
    -	private BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream) throws IOException {
    +	BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, boolean permanentBlob)
    --- End diff --
    
    I'm not sure, it's worth it since this method is about to become private to `TransientBlobCache` once the BLOB upload is not necessary from the client anymore, i.e. once we upload the job in a single (HTTP) operation


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142139615
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java ---
    @@ -252,6 +280,36 @@ public void testJobDeferredCleanup() throws IOException, InterruptedException {
     	}
     
     	/**
    +	 * Checks that BLOBs for the given <tt>jobId</tt> are cleaned up eventually (after calling
    +	 * {@link PermanentBlobCache#releaseJob(JobID)}, which is not done by this method!) (waits at
    +	 * most 30s).
    +	 *
    +	 * @param cache
    +	 * 		BLOB server
    +	 * @param jobId
    +	 * 		job ID or <tt>null</tt> if job-unrelated
    +	 * @param keys
    +	 * 		keys identifying BLOBs which were previously registered for the <tt>jobId</tt>
    +	 */
    +	static void verifyJobCleanup(PermanentBlobCache cache, JobID jobId, List<BlobKey> keys)
    +		throws InterruptedException, IOException {
    +		// because we cannot guarantee that there are not thread races in the build system, we
    +		// loop for a certain while until the references disappear
    +		{
    +			long deadline = System.currentTimeMillis() + 30_000L;
    --- End diff --
    
    Shouldn't the deadline be configurable? `verifyJobCleanup` should not know how long the clean up takes (I guess it depends on the cleanup interval).


---

[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4358
  
    Hi Nico, I'm still waiting for Travis before merging FLINK-7056. In the meantime could you rebase this PR and squash all FLINK-7068 commits into one? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137302842
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws IOException {
     		// start the server thread
     		setName("BLOB Server listener at " + getPort());
     		setDaemon(true);
    -		start();
    --- End diff --
    
    Why did you pull `start` out of the constructor? Wouldn't one always want to start the `BlobServer` when creating it?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142138008
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup.
    + *
    + * <p>When requesting BLOBs via {@link #getPermanentFile(JobID, BlobKey)}, the cache will first attempt to
    + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB,
    + * it will try to download it from a distributed HA file system (if available) or the BLOB server.
    + *
    + * <p>If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup.
    + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded.
    + */
    +public class PermanentBlobCache extends AbstractBlobCache implements PermanentBlobService {
    +
    +	/**
    +	 * Job reference counters with a time-to-live (TTL).
    +	 */
    +	@VisibleForTesting
    +	static class RefCount {
    +		/**
    +		 * Number of references to a job.
    +		 */
    +		public int references = 0;
    +
    +		/**
    +		 * Timestamp in milliseconds when any job data should be cleaned up (no cleanup for
    +		 * non-positive values).
    +		 */
    +		public long keepUntil = -1;
    +	}
    +
    +	/**
    +	 * Map to store the number of references to a specific job.
    +	 */
    +	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
    +
    +	/**
    +	 * Time interval (ms) to run the cleanup task; also used as the default TTL.
    +	 */
    +	private final long cleanupInterval;
    +
    +	private final Timer cleanupTimer;
    +
    +	/**
    +	 * Instantiates a new cache for permanent BLOBs which are also available in an HA store.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 * @param blobView
    +	 * 		(distributed) HA blob store file system to retrieve files from first
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public PermanentBlobCache(
    +			final InetSocketAddress serverAddress,
    +			final Configuration blobClientConfig,
    +			final BlobView blobView) throws IOException {
    +
    +		super(serverAddress, blobClientConfig, blobView,
    +			LoggerFactory.getLogger(PermanentBlobCache.class));
    +
    +		// Initializing the clean up task
    +		this.cleanupTimer = new Timer(true);
    +
    +		this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
    +		this.cleanupTimer.schedule(new PermanentBlobCleanupTask(), cleanupInterval, cleanupInterval);
    +	}
    +
    +	/**
    +	 * Registers use of job-related BLOBs.
    +	 *
    +	 * <p>Using any other method to access BLOBs, e.g. {@link #getPermanentFile}, is only valid within
    +	 * calls to <tt>registerJob(JobID)</tt> and {@link #releaseJob(JobID)}.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #releaseJob(JobID)
    +	 */
    +	public void registerJob(JobID jobId) {
    +		checkNotNull(jobId);
    +
    +		synchronized (jobRefCounters) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +			if (ref == null) {
    +				ref = new RefCount();
    +				jobRefCounters.put(jobId, ref);
    +			} else {
    +				// reset cleanup timeout
    +				ref.keepUntil = -1;
    +			}
    +			++ref.references;
    +		}
    +	}
    +
    +	/**
    +	 * Unregisters use of job-related BLOBs and allow them to be released.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 *
    +	 * @see #registerJob(JobID)
    +	 */
    +	public void releaseJob(JobID jobId) {
    +		checkNotNull(jobId);
    +
    +		synchronized (jobRefCounters) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +
    +			if (ref == null || ref.references == 0) {
    +				LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls for jobId " + jobId);
    +				return;
    +			}
    +
    +			--ref.references;
    +			if (ref.references == 0) {
    +				ref.keepUntil = System.currentTimeMillis() + cleanupInterval;
    +			}
    +		}
    +	}
    +
    +	public int getNumberOfReferenceHolders(JobID jobId) {
    +		checkNotNull(jobId);
    +
    +		synchronized (jobRefCounters) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +			if (ref == null) {
    +				return 0;
    +			} else {
    +				return ref.references;
    +			}
    +		}
    +	}
    +
    +	public int getNumberOfCachedJobs() {
    +		return jobRefCounters.size();
    +	}
    +
    +	/**
    +	 * Returns the path to a local copy of the file associated with the provided job ID and blob
    +	 * key.
    +	 *
    +	 * <p>We will first attempt to serve the BLOB from the local storage. If the BLOB is not in
    +	 * there, we will try to download it from the HA store, or directly from the {@link BlobServer}.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 * @param key
    +	 * 		blob key associated with the requested file
    +	 *
    +	 * @return The path to the file.
    +	 *
    +	 * @throws java.io.FileNotFoundException
    +	 * 		if the BLOB does not exist;
    +	 * @throws IOException
    +	 * 		if any other error occurs when retrieving the file
    +	 */
    +	@Override
    +	public File getPermanentFile(JobID jobId, BlobKey key) throws IOException {
    +		checkNotNull(jobId);
    +		return getTransientFileInternal(jobId, key);
    --- End diff --
    
    Maybe we could rename `getTransientFileInternal` into `getFileInternal`.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142136764
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
    @@ -407,4 +419,72 @@ static void closeSilently(Socket socket, Logger LOG) {
     	private BlobUtils() {
     		throw new RuntimeException();
     	}
    +
    +	/**
    +	 * Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
    +	 * use.
    +	 *
    +	 * @param incomingFile
    +	 * 		temporary file created during transfer
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
    +	 * @param blobKey
    +	 * 		BLOB key identifying the file
    +	 * @param storageFile
    +	 *      (local) file where the blob is/should be stored
    +	 * @param writeLock
    +	 *      lock to acquire before doing the move
    +	 * @param log
    +	 *      logger for debug information
    +	 * @param blobStore
    +	 *      HA store (or <tt>null</tt> if unavailable)
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if an I/O error occurs while moving the file or uploading it to the HA store
    +	 */
    +	static void moveTempFileToStore(
    +			File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile,
    +			Lock writeLock, Logger log, @Nullable BlobStore blobStore) throws IOException {
    +
    +		writeLock.lock();
    +
    +		try {
    +			// first check whether the file already exists
    +			if (!storageFile.exists()) {
    +				try {
    +					// only move the file if it does not yet exist
    +					Files.move(incomingFile.toPath(), storageFile.toPath());
    +
    +					incomingFile = null;
    +
    +				} catch (FileAlreadyExistsException ignored) {
    +					log.warn("Detected concurrent file modifications. This should only happen if multiple" +
    +						"BlobServer use the same storage directory.");
    +					// we cannot be sure at this point whether the file has already been uploaded to the blob
    +					// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
    --- End diff --
    
    "we have to"


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r141405909
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws IOException {
     		// start the server thread
     		setName("BLOB Server listener at " + getPort());
     		setDaemon(true);
    -		start();
    --- End diff --
    
    Makes sense.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137291080
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java ---
    @@ -0,0 +1,322 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides access to transient BLOB files stored at the {@link BlobServer}.
    + *
    + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy
    + */
    +public class TransientBlobCache implements TransientBlobService {
    +
    +	/** The log object used for debugging. */
    +	private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class);
    +
    +	/** Counter to generate unique names for temporary files. */
    +	private final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	private final InetSocketAddress serverAddress;
    +
    +	/**
    +	 * Root directory for local file storage
    +	 */
    +	private final File storageDir;
    +
    +	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/** Shutdown hook thread to ensure deletion of the local storage directory. */
    +	private final Thread shutdownHook;
    +
    +	/** The number of retries when the transfer fails */
    +	private final int numFetchRetries;
    +
    +	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    +	private final Configuration blobClientConfig;
    +
    +	/** Lock guarding concurrent file accesses */
    +	private final ReadWriteLock readWriteLock;
    +
    +	/**
    +	 * Instantiates a new BLOB cache.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public TransientBlobCache(
    +			final InetSocketAddress serverAddress,
    +			final Configuration blobClientConfig) throws IOException {
    --- End diff --
    
    Can we change it such that we don't pass in a `Configuration` object but instead the required values?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137293376
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -215,10 +215,10 @@ public static ClassLoader retrieveClassLoader(
     			JobManagerMessages.ClassloadingProps props = optProps.get();
     
     			InetSocketAddress serverAddress = new InetSocketAddress(jobManager.getHostname(), props.blobManagerPort());
    -			final BlobCache blobClient;
    +			final PermanentBlobCache blobClient;
     			try {
    -				// TODO: Fix lifecycle of BlobCache to properly close it upon usage
    -				blobClient = new BlobCache(serverAddress, config, highAvailabilityServices.createBlobStore());
    +				// TODO: Fix lifecycle of PermanentBlobCache to properly close it upon usage
    +				blobClient = new PermanentBlobCache(serverAddress, config, highAvailabilityServices.createBlobStore());
    --- End diff --
    
    shouldn't the variable be called `permanentBlobCache` instead of `blobClient`?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r141406045
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException {
     	 *
     	 * @param jobId
     	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    -	 * @param requiredBlob
    +	 * @param blobKey
     	 * 		blob key associated with the requested file
    +	 * @param highlyAvailable
    +	 * 		whether to the requested file is highly available (HA)
     	 *
     	 * @return file referring to the local storage location of the BLOB
     	 *
     	 * @throws IOException
     	 * 		Thrown if the file retrieval failed.
     	 */
    -	private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException {
    -		checkArgument(requiredBlob != null, "BLOB key cannot be null.");
    +	private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException {
    +		checkArgument(blobKey != null, "BLOB key cannot be null.");
     
    -		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob);
    +		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
    +		readWriteLock.readLock().lock();
     
    -		if (localFile.exists()) {
    +		try {
    +			getFileInternal(jobId, blobKey, highlyAvailable, localFile);
     			return localFile;
    +		} finally {
    +			readWriteLock.readLock().unlock();
     		}
    -		else {
    +	}
    +
    +	/**
    +	 * Helper to retrieve the local path of a file associated with a job and a blob key.
    +	 * <p>
    +	 * The blob server looks the blob key up in its local storage. If the file exists, it is
    +	 * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
    +	 * or a {@link FileNotFoundException} is thrown.
    +	 * <p>
    +	 * <strong>Assumes the read lock has already been acquired.</strong>
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    +	 * @param blobKey
    +	 * 		blob key associated with the requested file
    +	 * @param highlyAvailable
    +	 * 		whether to the requested file is highly available (HA)
    +	 * @param localFile
    +	 *      (local) file where the blob is/should be stored
    +	 *
    +	 * @throws IOException
    +	 * 		Thrown if the file retrieval failed.
    +	 */
    +	void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException {
    +		// assume readWriteLock.readLock() was already locked (cannot really check that)
    +
    +		if (localFile.exists()) {
    +			return;
    +		} else if (highlyAvailable) {
    +			// Try the HA blob store
    +			// first we have to release the read lock in order to acquire the write lock
    +			readWriteLock.readLock().unlock();
    +
    +			// use a temporary file (thread-safe without locking)
    +			File incomingFile = null;
     			try {
    -				// Try the blob store
    -				blobStore.get(jobId, requiredBlob, localFile);
    +				incomingFile = createTemporaryFilename();
    +				blobStore.get(jobId, blobKey, incomingFile);
    +
    +				BlobUtils.moveTempFileToStore(
    +					incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
    --- End diff --
    
    Alright.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142126654
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +
    +import org.slf4j.Logger;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Closeable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Abstract base class for permanent and transient BLOB files.
    + */
    +public abstract class AbstractBlobCache implements Closeable {
    +
    +	/**
    +	 * The log object used for debugging.
    +	 */
    +	protected final Logger LOG;
    +
    +	/**
    +	 * Counter to generate unique names for temporary files.
    +	 */
    +	protected final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	protected final InetSocketAddress serverAddress;
    +
    +	/**
    +	 * Root directory for local file storage.
    +	 */
    +	protected final File storageDir;
    +
    +	/**
    +	 * Blob store for distributed file storage, e.g. in HA.
    +	 */
    +	protected final BlobView blobView;
    +
    +	protected final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/**
    +	 * Shutdown hook thread to ensure deletion of the local storage directory.
    +	 */
    +	protected final Thread shutdownHook;
    +
    +	/**
    +	 * The number of retries when the transfer fails.
    +	 */
    +	protected final int numFetchRetries;
    +
    +	/**
    +	 * Configuration for the blob client like ssl parameters required to connect to the blob
    +	 * server.
    +	 */
    +	protected final Configuration blobClientConfig;
    +
    +	/**
    +	 * Lock guarding concurrent file accesses.
    +	 */
    +	protected final ReadWriteLock readWriteLock;
    +
    +	public AbstractBlobCache(
    +			final InetSocketAddress serverAddress,
    +			final Configuration blobClientConfig,
    +			final BlobView blobView,
    +			final Logger logger) throws IOException {
    +
    +		this.LOG = logger;
    +
    +		this.serverAddress = checkNotNull(serverAddress);
    +		this.blobClientConfig = checkNotNull(blobClientConfig);
    +		this.blobView = checkNotNull(blobView, "blobStore");
    --- End diff --
    
    `blobStore` != `blobView`


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142635118
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java ---
    @@ -83,30 +113,34 @@ public boolean equals(final Object obj) {
     
     		final BlobKey bk = (BlobKey) obj;
     
    -		return Arrays.equals(this.key, bk.key);
    +		return Arrays.equals(this.key, bk.key) && this.type == bk.type;
     	}
     
     	@Override
     	public int hashCode() {
    -		return Arrays.hashCode(this.key);
    +		return Arrays.hashCode(this.key) + this.type.hashCode();
    --- End diff --
    
    ok, I wasn't quite sure and based my code on some examples from the current Flink code. Apparently, I took the wrong ones.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137270473
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException {
     	 *
     	 * @param jobId
     	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    -	 * @param requiredBlob
    +	 * @param blobKey
     	 * 		blob key associated with the requested file
    +	 * @param highlyAvailable
    +	 * 		whether to the requested file is highly available (HA)
     	 *
     	 * @return file referring to the local storage location of the BLOB
     	 *
     	 * @throws IOException
     	 * 		Thrown if the file retrieval failed.
     	 */
    -	private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException {
    -		checkArgument(requiredBlob != null, "BLOB key cannot be null.");
    +	private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException {
    --- End diff --
    
    Maybe we could introduce an enum here as well for the `highlyAvailable` boolean argument.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r139926257
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
    @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
     		try {
     			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
     
    -			BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService);
    +			BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService);
    +		} finally {
    +			if (blobStoreService != null) {
    +				blobStoreService.closeAndCleanupAllData();
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are
    +	 * recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}.
    +	 */
    +	@Test
    +	public void testBlobServerCorruptedFile() throws Exception {
    +		org.apache.flink.configuration.Configuration
    +			config = new org.apache.flink.configuration.Configuration();
    +		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
    +		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
    --- End diff --
    
    I'm pretty sure, this should have been `FILESYSTEM` and I was just to eager to change everything to `ZOOKEEPER` when copying the tests over... Anyway, this option is not used at all because we only test the BLOB service classes (also in the test classes I copied this from).


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142129505
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -120,6 +125,76 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) t
     		}
     	}
     
    +	/**
    +	 * Downloads the given BLOB from the given server and stores its contents to a (local) file.
    +	 *
    +	 * <p>Transient BLOB files are deleted after a successful copy of the server's data into the
    +	 * given <tt>localJarFile</tt>.
    +	 *
    +	 * @param jobId
    +	 * 		job ID the BLOB belongs to or <tt>null</tt> if job-unrelated
    +	 * @param blobKey
    +	 * 		BLOB key
    +	 * @param localJarFile
    +	 * 		the local file to write to
    +	 * @param serverAddress
    +	 * 		address of the server to download from
    +	 * @param blobClientConfig
    +	 * 		client configuration for the connection
    +	 * @param numFetchRetries
    +	 * 		number of retries before failing
    +	 *
    +	 * @throws IOException
    +	 * 		if an I/O error occurs during the download
    +	 */
    +	static void downloadFromBlobServer(
    +			@Nullable JobID jobId, BlobKey blobKey, File localJarFile,
    +			InetSocketAddress serverAddress, Configuration blobClientConfig, int numFetchRetries)
    +			throws IOException {
    +
    +		final byte[] buf = new byte[BUFFER_SIZE];
    +		LOG.info("Downloading {}/{} from {}", jobId, blobKey, serverAddress);
    +
    +		// loop over retries
    +		int attempt = 0;
    +		while (true) {
    +			try (
    +				final BlobClient bc = new BlobClient(serverAddress, blobClientConfig);
    +				final InputStream is = bc.getInternal(jobId, blobKey);
    +				final OutputStream os = new FileOutputStream(localJarFile)
    +			) {
    +				while (true) {
    +					final int read = is.read(buf);
    +					if (read < 0) {
    +						break;
    +					}
    +					os.write(buf, 0, read);
    +				}
    +
    +				return;
    +			}
    +			catch (Throwable t) {
    +				String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + " from " + serverAddress +
    +					" and store it under " + localJarFile.getAbsolutePath();
    +				if (attempt < numFetchRetries) {
    +					if (LOG.isDebugEnabled()) {
    +						LOG.debug(message + " Retrying...", t);
    --- End diff --
    
    Shouldn't this also be an error if the other log statement is an error as well?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142133777
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -375,7 +378,32 @@ public File getFile(BlobKey key) throws IOException {
     	 * 		Thrown if the file retrieval failed.
     	 */
     	@Override
    -	public File getFile(JobID jobId, BlobKey key) throws IOException {
    +	public File getTransientFile(JobID jobId, BlobKey key) throws IOException {
    +		checkNotNull(jobId);
    +		return getFileInternal(jobId, key);
    +	}
    +
    +	/**
    +	 * Returns the path to a local copy of the file associated with the provided job ID and blob
    +	 * key.
    +	 * <p>
    +	 * We will first attempt to serve the BLOB from the local storage. If the BLOB is not in
    +	 * there, we will try to download it from the HA store.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 * @param key
    +	 * 		blob key associated with the requested file
    +	 *
    +	 * @return The path to the file.
    +	 *
    +	 * @throws java.io.FileNotFoundException
    +	 * 		if the BLOB does not exist;
    +	 * @throws IOException
    +	 * 		if any other error occurs when retrieving the file
    +	 */
    +	@Override
    +	public File getPermanentFile(JobID jobId, BlobKey key) throws IOException {
    --- End diff --
    
    Same here for `BlobType.PERMANENT`


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142138928
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -468,7 +468,7 @@ class JobManager(
               taskManagerGateway match {
                 case x: ActorTaskManagerGateway =>
                   handleTaskManagerTerminated(x.getActorGateway().actor(), instance.getId)
    -            case _ => log.debug(s"Cannot remove resource ${resourceID}, because there is " +
    +            case _ => log.debug(s"Cannot remove reosurce ${resourceID}, because there is " +
    --- End diff --
    
    typo resource


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142128412
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
    @@ -18,89 +18,21 @@
     
     package org.apache.flink.runtime.blob;
     
    -import org.apache.flink.annotation.VisibleForTesting;
    -import org.apache.flink.api.common.JobID;
    -import org.apache.flink.configuration.BlobServerOptions;
     import org.apache.flink.configuration.Configuration;
    -import org.apache.flink.util.FileUtils;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
     
    -import javax.annotation.Nullable;
    -import java.io.File;
    -import java.io.FileOutputStream;
     import java.io.IOException;
    -import java.io.InputStream;
    -import java.io.OutputStream;
     import java.net.InetSocketAddress;
    -import java.util.HashMap;
    -import java.util.Iterator;
    -import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -
    -import static org.apache.flink.util.Preconditions.checkArgument;
    -import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * The BLOB cache implements a local cache for content-addressable BLOBs.
    - *
    - * <p>When requesting BLOBs through the {@link BlobCache#getFile} methods, the
    - * BLOB cache will first attempt to serve the file from its local cache. Only if
    - * the local cache does not contain the desired BLOB, the BLOB cache will try to
    - * download it from a distributed file system (if available) or the BLOB
    - * server.</p>
    + * The BLOB cache provides access to BLOB services for permanent and transient BLOBs.
      */
    -public class BlobCache extends TimerTask implements BlobService {
    -
    -	/** The log object used for debugging. */
    -	private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class);
    -
    -	private final InetSocketAddress serverAddress;
    -
    -	/** Root directory for local file storage */
    -	private final File storageDir;
    -
    -	/** Blob store for distributed file storage, e.g. in HA */
    -	private final BlobView blobView;
    -
    -	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    -
    -	/** Shutdown hook thread to ensure deletion of the storage directory. */
    -	private final Thread shutdownHook;
    -
    -	/** The number of retries when the transfer fails */
    -	private final int numFetchRetries;
    -
    -	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    -	private final Configuration blobClientConfig;
    -
    -	// --------------------------------------------------------------------------------------------
    -
    -	/**
    -	 * Job reference counters with a time-to-live (TTL).
    -	 */
    -	private static class RefCount {
    -		/**
    -		 * Number of references to a job.
    -		 */
    -		public int references = 0;
    -		
    -		/**
    -		 * Timestamp in milliseconds when any job data should be cleaned up (no cleanup for
    -		 * non-positive values).
    -		 */
    -		public long keepUntil = -1;
    -	}
    -
    -	/** Map to store the number of references to a specific job */
    -	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
    +public class BlobCache implements BlobService {
     
    -	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
    -	private final long cleanupInterval;
    +	/** Caching store for permanent BLOBs. */
    +	private final PermanentBlobCache permanentBlobStore;
    --- End diff --
    
    Maybe we could rename `permanentBlobStore` -> `permanentBlobCache`.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137292784
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.api.common.JobID;
    +
    +import java.io.Closeable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +
    +/**
    + * A service to retrieve transient binary large objects (BLOBs).
    + * <p>
    + * These include per-job BLOBs that are , e.g. a job's JAR files, parts of an off-loaded {@link
    --- End diff --
    
    I thought that job's JAR files are stored in the permanent blob service?


---

[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4358
  
    sorry for the mess, but let me also drag in #4568 and adapt the code in here (which is moved from `BlobCache` to `PermanentBlobCache` by this PR)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137291176
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java ---
    @@ -0,0 +1,322 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides access to transient BLOB files stored at the {@link BlobServer}.
    + *
    + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy
    + */
    +public class TransientBlobCache implements TransientBlobService {
    +
    +	/** The log object used for debugging. */
    +	private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class);
    +
    +	/** Counter to generate unique names for temporary files. */
    +	private final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	private final InetSocketAddress serverAddress;
    +
    +	/**
    +	 * Root directory for local file storage
    +	 */
    +	private final File storageDir;
    +
    +	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/** Shutdown hook thread to ensure deletion of the local storage directory. */
    +	private final Thread shutdownHook;
    +
    +	/** The number of retries when the transfer fails */
    +	private final int numFetchRetries;
    +
    +	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    +	private final Configuration blobClientConfig;
    +
    +	/** Lock guarding concurrent file accesses */
    +	private final ReadWriteLock readWriteLock;
    +
    +	/**
    +	 * Instantiates a new BLOB cache.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public TransientBlobCache(
    +			final InetSocketAddress serverAddress,
    +			final Configuration blobClientConfig) throws IOException {
    +
    +		this.serverAddress = checkNotNull(serverAddress);
    +		this.blobClientConfig = checkNotNull(blobClientConfig);
    +		this.readWriteLock = new ReentrantReadWriteLock();
    +
    +		// configure and create the storage directory
    +		String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
    +		this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
    +		LOG.info("Created transient BLOB cache storage directory " + storageDir);
    +
    +		// configure the number of fetch retries
    +		final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
    +		if (fetchRetries >= 0) {
    +			this.numFetchRetries = fetchRetries;
    +		} else {
    +			LOG.warn("Invalid value for {}. System will attempt no retries on failed fetches of BLOBs.",
    +				BlobServerOptions.FETCH_RETRIES.key());
    +			this.numFetchRetries = 0;
    +		}
    +
    +		// Add shutdown hook to delete storage directory
    +		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +	}
    +
    +	@Override
    +	public File getFile(BlobKey key) throws IOException {
    +		return getFileInternal(null, key);
    +	}
    +
    +	@Override
    +	public File getFile(JobID jobId, BlobKey key) throws IOException {
    +		checkNotNull(jobId);
    +		return getFileInternal(jobId, key);
    +	}
    +
    +	/**
    +	 * Returns local copy of the file for the BLOB with the given key.
    +	 * <p>
    +	 * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
    +	 * the cache, the method will try to download it from this cache's BLOB server.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    +	 * @param blobKey
    +	 * 		The key of the desired BLOB.
    +	 *
    +	 * @return file referring to the local storage location of the BLOB.
    +	 *
    +	 * @throws IOException
    +	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
    +	 */
    +	private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
    +		checkArgument(blobKey != null, "BLOB key cannot be null.");
    +
    +		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
    +		readWriteLock.readLock().lock();
    +
    +		try {
    +			if (localFile.exists()) {
    +				return localFile;
    +			}
    +		} finally {
    +			readWriteLock.readLock().unlock();
    +		}
    +
    +		// download from the BlobServer directly
    +		// use a temporary file (thread-safe without locking)
    +		File incomingFile = createTemporaryFilename();
    +		try {
    +			BlobClient.downloadFromBlobServer(jobId, blobKey, false, incomingFile, serverAddress,
    +				blobClientConfig, numFetchRetries);
    --- End diff --
    
    When do we delete the transient file?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137268133
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -601,7 +564,39 @@ public void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOExceptio
     	}
     
     	/**
    -	 * Uploads the JAR files to a {@link BlobServer} at the given address.
    +	 * Reads the response from the input stream and throws in case of errors
    +	 *
    +	 * @param is
    +	 * 		stream to read from
    +	 *
    +	 * @return  <tt>true</tt> if the delete operation was successful at the {@link BlobServer};
    +	 *          <tt>false</tt> otherwise
    +	 *
    +	 * @throws IOException
    +	 * 		if the server code throws an exception or if reading the response failed
    +	 */
    +	private static boolean receiveAndCheckDeleteResponse(InputStream is) throws IOException {
    +		int response = is.read();
    +		if (response < 0) {
    +			throw new EOFException("Premature end of response");
    +		}
    +		if (response == RETURN_ERROR) {
    +			Throwable cause = readExceptionFromStream(is);
    +			if (cause == null) {
    +				return false;
    +			} else {
    +				throw new IOException("Server side error: " + cause.getMessage(), cause);
    --- End diff --
    
    I think we don't have to append the cause message to `IOException's` message, because it is included in the `cause`.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137268826
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -415,13 +393,17 @@ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, int offset, int l
     	 * 		the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
     	 * @param inputStream
     	 * 		the input stream to read the data from
    +	 * @param permanentBlob
    +	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
     	 *
     	 * @return the computed BLOB key of the uploaded BLOB
     	 *
     	 * @throws IOException
     	 * 		thrown if an I/O error occurs while uploading the data to the BLOB server
     	 */
    -	private BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream) throws IOException {
    +	BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, boolean permanentBlob)
    --- End diff --
    
    Should we introduce an `enum` instead of a boolean denoting whether the content is transient or permanent? This would have the advantage that it's much clearer what's happening when looking at code at the calling side.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137291887
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java ---
    @@ -0,0 +1,322 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides access to transient BLOB files stored at the {@link BlobServer}.
    + *
    + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy
    + */
    +public class TransientBlobCache implements TransientBlobService {
    +
    +	/** The log object used for debugging. */
    +	private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class);
    +
    +	/** Counter to generate unique names for temporary files. */
    +	private final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	private final InetSocketAddress serverAddress;
    +
    +	/**
    +	 * Root directory for local file storage
    +	 */
    +	private final File storageDir;
    +
    +	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/** Shutdown hook thread to ensure deletion of the local storage directory. */
    +	private final Thread shutdownHook;
    +
    +	/** The number of retries when the transfer fails */
    +	private final int numFetchRetries;
    +
    +	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    +	private final Configuration blobClientConfig;
    +
    +	/** Lock guarding concurrent file accesses */
    +	private final ReadWriteLock readWriteLock;
    +
    +	/**
    +	 * Instantiates a new BLOB cache.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public TransientBlobCache(
    +			final InetSocketAddress serverAddress,
    +			final Configuration blobClientConfig) throws IOException {
    --- End diff --
    
    Probably not so easy because we also need it for the `BlobClient` creation.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r139966738
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java ---
    @@ -0,0 +1,429 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup.
    + * <p>
    + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to
    + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB,
    + * it will try to download it from a distributed HA file system (if available) or the BLOB server.
    + * <p>
    + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup.
    + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded.
    + */
    +public class PermanentBlobCache extends TimerTask implements PermanentBlobService {
    --- End diff --
    
    The latter is not possible since each class has its uniquely offered methods, e.g. `TransientBlobCache` allows deleting BLOBs, while `PermanentBlobCache` does not. I can come up with a common base class though.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142133726
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -375,7 +378,32 @@ public File getFile(BlobKey key) throws IOException {
     	 * 		Thrown if the file retrieval failed.
     	 */
     	@Override
    -	public File getFile(JobID jobId, BlobKey key) throws IOException {
    +	public File getTransientFile(JobID jobId, BlobKey key) throws IOException {
    +		checkNotNull(jobId);
    --- End diff --
    
    Should we check that `key` equals actually the `BlobType.TRANSIENT`?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142131457
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java ---
    @@ -83,30 +113,34 @@ public boolean equals(final Object obj) {
     
     		final BlobKey bk = (BlobKey) obj;
     
    -		return Arrays.equals(this.key, bk.key);
    +		return Arrays.equals(this.key, bk.key) && this.type == bk.type;
     	}
     
     	@Override
     	public int hashCode() {
    -		return Arrays.hashCode(this.key);
    +		return Arrays.hashCode(this.key) + this.type.hashCode();
    --- End diff --
    
    I think when one combines field, then one should multiply the fields with a prime, e.g. `37`.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142128358
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
    @@ -18,89 +18,21 @@
     
     package org.apache.flink.runtime.blob;
     
    -import org.apache.flink.annotation.VisibleForTesting;
    -import org.apache.flink.api.common.JobID;
    -import org.apache.flink.configuration.BlobServerOptions;
     import org.apache.flink.configuration.Configuration;
    -import org.apache.flink.util.FileUtils;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
     
    -import javax.annotation.Nullable;
    -import java.io.File;
    -import java.io.FileOutputStream;
     import java.io.IOException;
    -import java.io.InputStream;
    -import java.io.OutputStream;
     import java.net.InetSocketAddress;
    -import java.util.HashMap;
    -import java.util.Iterator;
    -import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -
    -import static org.apache.flink.util.Preconditions.checkArgument;
    -import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * The BLOB cache implements a local cache for content-addressable BLOBs.
    - *
    - * <p>When requesting BLOBs through the {@link BlobCache#getFile} methods, the
    - * BLOB cache will first attempt to serve the file from its local cache. Only if
    - * the local cache does not contain the desired BLOB, the BLOB cache will try to
    - * download it from a distributed file system (if available) or the BLOB
    - * server.</p>
    + * The BLOB cache provides access to BLOB services for permanent and transient BLOBs.
      */
    -public class BlobCache extends TimerTask implements BlobService {
    -
    -	/** The log object used for debugging. */
    -	private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class);
    -
    -	private final InetSocketAddress serverAddress;
    -
    -	/** Root directory for local file storage */
    -	private final File storageDir;
    -
    -	/** Blob store for distributed file storage, e.g. in HA */
    -	private final BlobView blobView;
    -
    -	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    -
    -	/** Shutdown hook thread to ensure deletion of the storage directory. */
    -	private final Thread shutdownHook;
    -
    -	/** The number of retries when the transfer fails */
    -	private final int numFetchRetries;
    -
    -	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    -	private final Configuration blobClientConfig;
    -
    -	// --------------------------------------------------------------------------------------------
    -
    -	/**
    -	 * Job reference counters with a time-to-live (TTL).
    -	 */
    -	private static class RefCount {
    -		/**
    -		 * Number of references to a job.
    -		 */
    -		public int references = 0;
    -		
    -		/**
    -		 * Timestamp in milliseconds when any job data should be cleaned up (no cleanup for
    -		 * non-positive values).
    -		 */
    -		public long keepUntil = -1;
    -	}
    -
    -	/** Map to store the number of references to a specific job */
    -	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
    +public class BlobCache implements BlobService {
    --- End diff --
    
    Could this be called `BlobServiceImpl`?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r139969639
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java ---
    @@ -0,0 +1,322 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Provides access to transient BLOB files stored at the {@link BlobServer}.
    + *
    + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy
    + */
    +public class TransientBlobCache implements TransientBlobService {
    +
    +	/** The log object used for debugging. */
    +	private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class);
    +
    +	/** Counter to generate unique names for temporary files. */
    +	private final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	private final InetSocketAddress serverAddress;
    +
    +	/**
    +	 * Root directory for local file storage
    +	 */
    +	private final File storageDir;
    +
    +	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/** Shutdown hook thread to ensure deletion of the local storage directory. */
    +	private final Thread shutdownHook;
    +
    +	/** The number of retries when the transfer fails */
    +	private final int numFetchRetries;
    +
    +	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
    +	private final Configuration blobClientConfig;
    +
    +	/** Lock guarding concurrent file accesses */
    +	private final ReadWriteLock readWriteLock;
    +
    +	/**
    +	 * Instantiates a new BLOB cache.
    +	 *
    +	 * @param serverAddress
    +	 * 		address of the {@link BlobServer} to use for fetching files from
    +	 * @param blobClientConfig
    +	 * 		global configuration
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
    +	 */
    +	public TransientBlobCache(
    +			final InetSocketAddress serverAddress,
    +			final Configuration blobClientConfig) throws IOException {
    +
    +		this.serverAddress = checkNotNull(serverAddress);
    +		this.blobClientConfig = checkNotNull(blobClientConfig);
    +		this.readWriteLock = new ReentrantReadWriteLock();
    +
    +		// configure and create the storage directory
    +		String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
    +		this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
    +		LOG.info("Created transient BLOB cache storage directory " + storageDir);
    +
    +		// configure the number of fetch retries
    +		final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
    +		if (fetchRetries >= 0) {
    +			this.numFetchRetries = fetchRetries;
    +		} else {
    +			LOG.warn("Invalid value for {}. System will attempt no retries on failed fetches of BLOBs.",
    +				BlobServerOptions.FETCH_RETRIES.key());
    +			this.numFetchRetries = 0;
    +		}
    +
    +		// Add shutdown hook to delete storage directory
    +		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +	}
    +
    +	@Override
    +	public File getFile(BlobKey key) throws IOException {
    +		return getFileInternal(null, key);
    +	}
    +
    +	@Override
    +	public File getFile(JobID jobId, BlobKey key) throws IOException {
    +		checkNotNull(jobId);
    +		return getFileInternal(jobId, key);
    +	}
    +
    +	/**
    +	 * Returns local copy of the file for the BLOB with the given key.
    +	 * <p>
    +	 * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
    +	 * the cache, the method will try to download it from this cache's BLOB server.
    +	 *
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
    +	 * @param blobKey
    +	 * 		The key of the desired BLOB.
    +	 *
    +	 * @return file referring to the local storage location of the BLOB.
    +	 *
    +	 * @throws IOException
    +	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
    +	 */
    +	private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
    +		checkArgument(blobKey != null, "BLOB key cannot be null.");
    +
    +		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
    +		readWriteLock.readLock().lock();
    +
    +		try {
    +			if (localFile.exists()) {
    +				return localFile;
    +			}
    +		} finally {
    +			readWriteLock.readLock().unlock();
    +		}
    +
    +		// download from the BlobServer directly
    +		// use a temporary file (thread-safe without locking)
    +		File incomingFile = createTemporaryFilename();
    +		try {
    +			BlobClient.downloadFromBlobServer(jobId, blobKey, false, incomingFile, serverAddress,
    +				blobClientConfig, numFetchRetries);
    --- End diff --
    
    only manually via the `TransientBlobCache#delete()` methods


---

[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4358
  
    After an offline discussion with @tillrohrmann, we agreed to have more of the transient vs. permanent BLOB handling inside the `BlobServer` by including the type into the `BlobKey`. The caches, however, remain separate (with a common base class for shared code) because of the different cache guarantees they give. In addition, we decided upon having a get-and-delete functionality for transient BLOBs (as planned by a TODO item previously).
    
    This means the following changes:
    - the `BlobServer` deletes transient BLOBs after they have been successfully downloaded to a BLOB cache (the client acknowledges that)
    - the `TransientBlobCache` and `PermanentBlobCache` differ in the functionality they provide:
      - `PermanentBlobCache` uses ref-counting for cleaning up its resources
      - `TransientBlobCache` imposes the user to manually delete requested BLOB files (from the cache!)
      - `TransientBlobCache` allows uploading (transient) BLOBs
      - `TransientBlobCache` allows non-job-related BLOBs
      - both have a GET method: `#getTransientFile()` vs. `#getPermanentFile()`
      - irrespective of the actual cache being used, the `BlobServer` decides whether to delete the BLOB (transient BLOBs only) or not (permanent BLOBs)
      - theoretically, a permanent BLOB could also be retrieved from the `TransientBlobCache` (and the other way around if job-related) - this behaviour is not used though but basic tests exist to cover it


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142126084
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.blob;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.FileUtils;
    +
    +import org.slf4j.Logger;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Closeable;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Abstract base class for permanent and transient BLOB files.
    + */
    +public abstract class AbstractBlobCache implements Closeable {
    +
    +	/**
    +	 * The log object used for debugging.
    +	 */
    +	protected final Logger LOG;
    +
    +	/**
    +	 * Counter to generate unique names for temporary files.
    +	 */
    +	protected final AtomicLong tempFileCounter = new AtomicLong(0);
    +
    +	protected final InetSocketAddress serverAddress;
    +
    +	/**
    +	 * Root directory for local file storage.
    +	 */
    +	protected final File storageDir;
    +
    +	/**
    +	 * Blob store for distributed file storage, e.g. in HA.
    +	 */
    +	protected final BlobView blobView;
    +
    +	protected final AtomicBoolean shutdownRequested = new AtomicBoolean();
    +
    +	/**
    +	 * Shutdown hook thread to ensure deletion of the local storage directory.
    +	 */
    +	protected final Thread shutdownHook;
    +
    +	/**
    +	 * The number of retries when the transfer fails.
    +	 */
    +	protected final int numFetchRetries;
    +
    +	/**
    +	 * Configuration for the blob client like ssl parameters required to connect to the blob
    +	 * server.
    +	 */
    +	protected final Configuration blobClientConfig;
    +
    +	/**
    +	 * Lock guarding concurrent file accesses.
    +	 */
    +	protected final ReadWriteLock readWriteLock;
    +
    +	public AbstractBlobCache(
    +			final InetSocketAddress serverAddress,
    +			final Configuration blobClientConfig,
    +			final BlobView blobView,
    +			final Logger logger) throws IOException {
    +
    +		this.LOG = logger;
    --- End diff --
    
    check not null


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142637006
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java ---
    @@ -252,6 +280,36 @@ public void testJobDeferredCleanup() throws IOException, InterruptedException {
     	}
     
     	/**
    +	 * Checks that BLOBs for the given <tt>jobId</tt> are cleaned up eventually (after calling
    +	 * {@link PermanentBlobCache#releaseJob(JobID)}, which is not done by this method!) (waits at
    +	 * most 30s).
    +	 *
    +	 * @param cache
    +	 * 		BLOB server
    +	 * @param jobId
    +	 * 		job ID or <tt>null</tt> if job-unrelated
    +	 * @param keys
    +	 * 		keys identifying BLOBs which were previously registered for the <tt>jobId</tt>
    +	 */
    +	static void verifyJobCleanup(PermanentBlobCache cache, JobID jobId, List<BlobKey> keys)
    +		throws InterruptedException, IOException {
    +		// because we cannot guarantee that there are not thread races in the build system, we
    +		// loop for a certain while until the references disappear
    +		{
    +			long deadline = System.currentTimeMillis() + 30_000L;
    --- End diff --
    
    Since the cleanup interval is only a lower bound on the time of the actual cleanup due to thread scheduling, we can't really use that. Basically, the way, `verifyJobCleanup` is used by now is that one expects that the file is deleted (but can't be sure) and therefore, we have this extra buffer of 30s. It could be made configurable, when desired, I guess, but let's not do this now.


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137259367
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
    @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
     		try {
     			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
     
    -			BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService);
    +			BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService);
    +		} finally {
    +			if (blobStoreService != null) {
    +				blobStoreService.closeAndCleanupAllData();
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are
    +	 * recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}.
    +	 */
    +	@Test
    +	public void testBlobServerCorruptedFile() throws Exception {
    +		org.apache.flink.configuration.Configuration
    +			config = new org.apache.flink.configuration.Configuration();
    +		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
    +		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
    --- End diff --
    
    I think "ZooKeeper" is not a valid state backend. What did you want to do with that?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137266916
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOExcepti
     	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
     	 * @param blobKey
     	 * 		blob key associated with the requested file
    +	 * @param permanentBlob
    +	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
     	 *
     	 * @throws IOException
     	 *         thrown if an I/O error occurs while writing the header data to the output stream
     	 */
    -	private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
    +	private static void sendGetHeader(
    +			OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob)
    +			throws IOException {
     		checkNotNull(blobKey);
    +		checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related");
     
     		// Signal type of operation
     		outputStream.write(GET_OPERATION);
     
     		// Send job ID and key
     		if (jobId == null) {
     			outputStream.write(CONTENT_NO_JOB);
    --- End diff --
    
    Should we rename this variable to `TRANSIENT_CONTENT`?


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142137151
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java ---
    @@ -64,39 +65,60 @@ public FileSystemBlobStore(FileSystem fileSystem, String storagePath) throws IOE
     	// - Put ------------------------------------------------------------------
     
     	@Override
    -	public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
    -		put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
    +	public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
    +		return put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
     	}
     
    -	private void put(File fromFile, String toBlobPath) throws IOException {
    +	private boolean put(File fromFile, String toBlobPath) throws IOException {
     		try (OutputStream os = fileSystem.create(new Path(toBlobPath), FileSystem.WriteMode.OVERWRITE)) {
     			LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
     			Files.copy(fromFile, os);
     		}
    +		return true;
     	}
     
     	// - Get ------------------------------------------------------------------
     
     	@Override
    -	public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
    -		get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile);
    +	public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
    +		return get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile, blobKey);
     	}
     
    -	private void get(String fromBlobPath, File toFile) throws IOException {
    +	private boolean get(String fromBlobPath, File toFile, BlobKey blobKey) throws IOException {
     		checkNotNull(fromBlobPath, "Blob path");
     		checkNotNull(toFile, "File");
    +		checkNotNull(blobKey, "Blob key");
     
     		if (!toFile.exists() && !toFile.createNewFile()) {
     			throw new IOException("Failed to create target file to copy to");
     		}
     
     		final Path fromPath = new Path(fromBlobPath);
    +		MessageDigest md = BlobUtils.createMessageDigest();
    +
    +		final int buffSize = 4096; // like IOUtils#BLOCKSIZE, for chunked file copying
     
     		boolean success = false;
     		try (InputStream is = fileSystem.open(fromPath);
     			FileOutputStream fos = new FileOutputStream(toFile)) {
     			LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);
    -			IOUtils.copyBytes(is, fos); // closes the streams
    +
    +			// not using IOUtils.copyBytes(is, fos) here to be able to create a hash on-the-fly
    +			final byte[] buf = new byte[buffSize];
    +			int bytesRead = is.read(buf);
    +			while (bytesRead >= 0) {
    +				fos.write(buf, 0, bytesRead);
    +				md.update(buf, 0, bytesRead);
    +
    +				bytesRead = is.read(buf);
    +			}
    +
    +			// verify that file contents are correct
    +			final byte[] computedKey = md.digest();
    +			if (!Arrays.equals(computedKey, blobKey.getHash())) {
    +				throw new IOException("Detected data corruption during transfer");
    +			}
    --- End diff --
    
    Nice addition :-)


---

[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4358
  
    Thanks a lot for your work and patience with me @NicoK. Changes look good to me. I've rebased it onto the latest master and once Travis gives green light, I'll merge it :-)


---

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r137266511
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOExcepti
     	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
     	 * @param blobKey
     	 * 		blob key associated with the requested file
    +	 * @param permanentBlob
    +	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
     	 *
     	 * @throws IOException
     	 *         thrown if an I/O error occurs while writing the header data to the output stream
     	 */
    -	private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
    +	private static void sendGetHeader(
    +			OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob)
    +			throws IOException {
     		checkNotNull(blobKey);
    +		checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related");
     
     		// Signal type of operation
     		outputStream.write(GET_OPERATION);
     
     		// Send job ID and key
     		if (jobId == null) {
     			outputStream.write(CONTENT_NO_JOB);
    +		} else if (permanentBlob) {
    +			outputStream.write(CONTENT_FOR_JOB_HA);
    --- End diff --
    
    Does it make sense to rename this constant to `PERMANENT_JOB_CONTENT`?


---