You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Mahadev konar (JIRA)" <ji...@apache.org> on 2006/09/01 05:34:23 UTC
[jira] Updated: (HADOOP-288) RFC: Efficient file caching
[ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]
Mahadev konar updated HADOOP-288:
---------------------------------
Attachment: caching-3.patch
incorporated most of owen's comments, except that of merging all my junit tests into other junit tests. I merged my local/minimr junit test into TestMiniMRLocalFS. I tried including my caching junit test with DFS and MiniMR in TestMiniMRWIthDFS but that made it longer htan 3 mins. It currently takes arnd 140 seconds. So I have a seperate junit test for DFS and MiniMR.
> RFC: Efficient file caching
> ---------------------------
>
> Key: HADOOP-288
> URL: http://issues.apache.org/jira/browse/HADOOP-288
> Project: Hadoop
> Issue Type: Bug
> Affects Versions: 0.6.0
> Reporter: Michel Tourn
> Assigned To: Mahadev konar
> Attachments: caching-3.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job.
> oo it is often constant for a month at a time
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
> The Job submitter must recreate a large jar(tar) file for every Job.
> (The jar contains both changing programs and stable dictionaries)
> The large Jar file must be propagated from the client to HDFS with
> a large replication factor.
> At the beginning of every Task, the Task tracker gets the job jar from HDFS
> and unjars it in the working directory. This can dominate task execution time.
>
> Sol.2 has nice properties but also some problems.
> It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
> It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
> It requires rsync.
>
> Sol.3 alleviates the rsync scalability problems but
> It is a dependency on an external system.
> We want something simpler and more tightly integrated with Hadoop.
>
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine
> (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as
> local files at predefined paths.
> ([1] Existing programs should be reusable with no changes.
> This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
> (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
> the job jar data is unjarred in the "working directory" of the Task
> the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options:
> 1. the archive/file timestamp
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do
> not require synchronized clocks, only locally monotonic time.
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp
> in the same way that it caches the value of the content hash along with the source data.
>
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although
> the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
> o less code, no discussion about which archive formats are supported
> o fine for large dictionary files. And when files are not large, user may as well
> put them in the Job jar as usual.
> o user code could always check and unarchive specific cached files
> (as a side-effect of MapRed task initialization)
> Cons:
> o handling small files may be inefficient
> (multiple HDFS operations, multiple hash computation,
> one 'metadata' hash file along with each small file)
> o It will not be possible to handle the Job's jar file as a special case of caching
> Cache isolation:
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
> what is the latest available version of the out-of-band data.
> Then make this version available to the MapRed job.
> Two ways to do this.
> o either set a job property just-in-time:
> addCachePathPair("/mydata/v1234/", "localcache/mydata_latest");
> (see Job Configuration for meaning of this)
> o or publish the decision as an HDFS file containing the version.
> then rely on user code to read the version, and manually populate the cache:
> Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
> (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
> HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
> in MyMapper constructor (or in map() on first record) user is asked to add:
>
> Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
> Cache.syncCache("..."); //etc.
>
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
> communicate the list of pairs (hdfs path; local path)
>
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
> no special Job configuration:
> it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
> some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");
--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira