You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Doug Cutting (JIRA)" <ji...@apache.org> on 2006/09/15 00:18:55 UTC

[jira] Resolved: (HADOOP-288) RFC: Efficient file caching

     [ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Doug Cutting resolved HADOOP-288.
---------------------------------

    Fix Version/s: 0.7.0
       Resolution: Fixed

I just committed this.  Thanks, Mahadev!

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>             Fix For: 0.7.0
>
>         Attachments: caching-3.patch, caching-4.patch, caching-5.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira