You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Michel Tourn (JIRA)" <ji...@apache.org> on 2006/06/08 03:28:29 UTC

[jira] Created: (HADOOP-288) RFC: Efficient file caching

RFC: Efficient file caching 
----------------------------

         Key: HADOOP-288
         URL: http://issues.apache.org/jira/browse/HADOOP-288
     Project: Hadoop
        Type: Bug

    Reporter: Michel Tourn
 Assigned to: Michel Tourn 


RFC: Efficient file caching 
(on Hadoop Task nodes, for benefit of MapReduce Tasks)
------------------------------------------------------

We will start implementing this soon. Please provide feedback and improvements to this plan.

The header "Options:" indicates places where simple choices must be made.


Problem:
-------
o MapReduce tasks require access to additional out-of-band data ("dictionaries")

This out-of-band data is:

o in addition to the map/reduce inputs.
o large (1GB+)
o broadcast (same data is required on all the Task nodes)
o changes "infrequently", in particular:
oo it is always constant for all the Tasks in a Job. 
oo it is often constant for a month at a time 
oo it may be shared across team members
o sometimes used by pure-Java MapReduce programs
o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
o (future) used by programs that use HDFS and Task-trackers but not MapReduce.

Existing Solutions to the problem:
---------------------------------
These solutions are not good enough. The present proposal is to do Sol 1 with caching.

Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
Sol 3: Non  Hadoop: use BitTorrent, etc.

Sol.1 is correct but slow for many reasons:
 The Job submitter must recreate a large jar(tar) file for every Job.
  (The jar contains both changing programs and stable dictionaries)
 The large Jar file must be propagated from the client to HDFS with 
 a large replication factor. 
 At the beginning of every Task, the Task tracker gets the job jar from HDFS 
 and unjars it in the working directory. This can dominate task execution time.
 
Sol.2 has nice properties but also some problems.
 It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
 It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
 It requires rsync.
 
Sol.3 alleviates the rsync scalability problems but 
      It is a dependency on an external system. 
      We want something simpler and more tightly integrated with Hadoop.
      

Staging (uploading) out-of-band data:
------------------------------------
The out-of-band data will often originate on the local filesystem of a user machine 
 (i.e. a MapReduce job submitter)
Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
o HDFS has (wide) replication. This enables scalable broadcast later.
o HDFS is an available channel to move data from clients to all task machines.
o HDFS is convenient as a shared location among Hadoop team members.


Accessing (downloading) out-of-band data:
----------------------------------------
The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
Instead these programs just want to access out-of-band data as 
 local files at predefined paths.
([1] Existing programs should be reusable with no changes. 
 This is often possible bec. communication is over stdin/stdout.)



Job's jar file as a special case:
--------------------------------
One use case is to allow users to make the job jar itself cachable.

This is only useful in cases where NOTHING changes when a job is resubmitted
 (no MapRed code changes and no changes in shipped data)
This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)

Currently the Hadoop mapred-jar mechanism works in this way:
 the job jar data is unjarred in the "working directory" of the Task 
 the jar contains both MapRed java code (added to classpath)



Cache synchronization:
---------------------

The efficient implementation of the out-of-band data distribution
is mostly a cache synchronization problem.
A list of the various aspects where choices must be made follows.


Cache key:
---------
How do you test that the cached copy is out-of-date?

Options: 
1. the archive/file timestamp 
2. the MD5 of the archive/file content

Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)

Timestamps stored with the source ('last-propagate-time') do 
 not require synchronized clocks, only locally monotonic time. 
(and the worse which can happen at daylight-savings switch is a missed update or an extra-update)

The cache code could store a copy of the local timestamp 
in the same way that it caches the value of the content hash along with the source data.
 


Cachable unit:
-------------
Options: individual files or archives or both.

Note:
At the API level, directories will be processed recursively 
(and the local FS directories will parallel HDFS directories)
So bulk operations are always possible using directories.
The question here is whether to handle archives as an additional bulk mechanism.


Archives are special because:
o unarchiving occurs transparently as part of the cache sync
o The cache key is computed on the archive and preserved although 
  the archive itself is not preserved.
Supported archive format will be: tar (maybe tgz or compressed jar)
Archive detection test: by filename extension ".tar" or ".jar"

Suppose we don't handle archives as special files:
Pros:
 o less code, no discussion about which archive formats are supported
 o fine for large dictionary files. And when files are not large, user may as well
   put them in the Job jar as usual.
 o user code could always check and unarchive specific cached files
   (as a side-effect of MapRed task initialization)
Cons:
 o handling small files may be inefficient 
  (multiple HDFS operations, multiple hash computation, 
   one 'metadata' hash file along with each small file)
 o It will not be possible to handle the Job's jar file as a special case of caching 



Cache isolation: 
---------------
In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
The file may become unavailable for a short period of time and some tasks fail.
The file may change (atomically) and different tasks use a different version.

This isolation problem is not addressed in this proposal.
Standard solutions to the isolation problem are:

o Assume that Jobs and interfering cache updates won't occur concurrently.

o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.

o Before running the MapRed job, run a non-distributed application that tests
  what is the latest available version of the out-of-band data. 
  Then make this version available to the MapRed job.
  Two ways to do this. 
  o either set a job property just-in-time:
    addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
    (see Job Configuration for meaning of this)
  o or publish the decision as an HDFS file containing the version.
    then rely on user code to read the version, and manually populate the cache:
    Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
    (see MapReduce API for meaning of this)


Cache synchronization stages:
----------------------------
There are two stages: Client-to-HDFS and HDFS-to-TaskTracker

o Client-to-HDFS stage.
Options: A simple option is to not do anything here, i.e. rely on the user.

This is a reasonable option given previous remarks on the role of HDFS:
 HDFS is a staging/publishing area and a natural shared location.
In particular this means that the system need not track 
where the client files come from.


o HDFS-to-TaskTracker:
Client-to-HDFS synchronization (if done at all) should happen before this.
Then HDFS-to-TaskTracker synchronization must happen right before 
the data is needed on a node.



MapReduce cache API:
-------------------
Options:

1. No change in MapReduce framework code:
require the user to put this logic in map() (or reduce) function:

 in MyMapper constructor (or in map() on first record) user is asked to add:
 
    Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
    Cache.syncCache("..."); //etc.
  
-----

2. Put this logic in MapReduce framework and use Job properties to
   communicate the list of pairs (hdfs path; local path)
 
Directories are processed recursively.
If archives are treated specially then they are unarchived on destination.

 
MapReduce Job Configuration:
---------------------------
Options:

with No change in MapReduce framework code (see above)
 no special Job configuration: 
   it is up to the MapRed writer to configure and run the cache operations.

---
with Logic in MapReduce framework (see above)
 some simple Job configuration

JobConf.addCachePathPair(String, String)
JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");



-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


Re: [jira] Created: (HADOOP-288) RFC: Efficient file caching

Posted by Eric Baldeschwieler <er...@yahoo-inc.com>.
Seems like we should do our best to be sure that every task does get  
the same file.  This would be a nice property and maybe not that  
hard.  Could at least check signatures and fail if there is a change.

Right now the jar file is copied and uncompressed with each task,  
correct?  If so, there is more utility to caching this file than you  
propose.

I don't understand your proposed API, when you start talking about  
files and recursive directories.  I propose we support only archives  
as inputs (or single files).  Can enforce extensions, or provide an  
unpacking config option.

Where would you extend the existing map reduce API?  To what extent  
is this "just library code"?  What APIs are added to job config, the  
task tracker, etc?

On Jun 7, 2006, at 6:28 PM, Michel Tourn (JIRA) wrote:

> RFC: Efficient file caching
> ----------------------------
>
>          Key: HADOOP-288
>          URL: http://issues.apache.org/jira/browse/HADOOP-288
>      Project: Hadoop
>         Type: Bug
>
>     Reporter: Michel Tourn
>  Assigned to: Michel Tourn
>
>
> RFC: Efficient file caching
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
>
> We will start implementing this soon. Please provide feedback and  
> improvements to this plan.
>
> The header "Options:" indicates places where simple choices must be  
> made.
>
>
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data  
> ("dictionaries")
>
> This out-of-band data is:
>
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job.
> oo it is often constant for a month at a time
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop- 
> Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not  
> MapReduce.
>
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do  
> Sol 1 with caching.
>
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce  
> Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source  
> for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
>
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with
>  a large replication factor.
>  At the beginning of every Task, the Task tracker gets the job jar  
> from HDFS
>  and unjars it in the working directory. This can dominate task  
> execution time.
>
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync  
> read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the  
> cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>
> Sol.3 alleviates the rsync scalability problems but
>       It is a dependency on an external system.
>       We want something simpler and more tightly integrated with  
> Hadoop.
>
>
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem  
> of a user machine
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out- 
> of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all  
> task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
>
>
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes.
>  This is often possible bec. communication is over stdin/stdout.)
>
>
>
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
>
> This is only useful in cases where NOTHING changes when a job is  
> resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from  
> an external source: like Nutch crawler)
>
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task
>  the jar contains both MapRed java code (added to classpath)
>
>
>
> Cache synchronization:
> ---------------------
>
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
>
>
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
>
> Options:
> 1. the archive/file timestamp
> 2. the MD5 of the archive/file content
>
> Comparing source and destination Timestamps is problematic bec. it  
> assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons,  
> like scalability of metadata ops)
>
> Timestamps stored with the source ('last-propagate-time') do
>  not require synchronized clocks, only locally monotonic time.
> (and the worse which can happen at daylight-savings switch is a  
> missed update or an extra-update)
>
> The cache code could store a copy of the local timestamp
> in the same way that it caches the value of the content hash along  
> with the source data.
>
>
>
> Cachable unit:
> -------------
> Options: individual files or archives or both.
>
> Note:
> At the API level, directories will be processed recursively
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional  
> bulk mechanism.
>
>
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
>
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large,  
> user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient
>   (multiple HDFS operations, multiple hash computation,
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a  
> special case of caching
>
>
>
> Cache isolation:
> ---------------
> In some cases it may be a problem if the cached HDFS files are  
> updated while a Job is in progress:
> The file may become unavailable for a short period of time and some  
> tasks fail.
> The file may change (atomically) and different tasks use a  
> different version.
>
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
>
> o Assume that Jobs and interfering cache updates won't occur  
> concurrently.
>
> o Put a version number in the HDFS file paths and refer to a hard- 
> coded version in the Job code.
>
> o Before running the MapRed job, run a non-distributed application  
> that tests
>   what is the latest available version of the out-of-band data.
>   Then make this version available to the MapRed job.
>   Two ways to do this.
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest");
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually  
> populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/ 
> fileordir");
>     (see MapReduce API for meaning of this)
>
>
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
>
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on  
> the user.
>
> This is a reasonable option given previous remarks on the role of  
> HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track
> where the client files come from.
>
>
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen  
> before this.
> Then HDFS-to-TaskTracker synchronization must happen right before
> the data is needed on a node.
>
>
>
> MapReduce cache API:
> -------------------
> Options:
>
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>
>  in MyMapper constructor (or in map() on first record) user is  
> asked to add:
>
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/ 
> fileordir");
>     Cache.syncCache("..."); //etc.
>
> -----
>
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on  
> destination.
>
>
> MapReduce Job Configuration:
> ---------------------------
> Options:
>
> with No change in MapReduce framework code (see above)
>  no special Job configuration:
>    it is up to the MapRed writer to configure and run the cache  
> operations.
>
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
>
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/ 
> fileordir");
>
>
>
> -- 
> This message is automatically generated by JIRA.
> -
> If you think it was sent incorrectly contact one of the  
> administrators:
>    http://issues.apache.org/jira/secure/Administrators.jspa
> -
> For more information on JIRA, see:
>    http://www.atlassian.com/software/jira
>


Re: [jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by Paul Sutter <su...@gmail.com>.
Great, it sounds like you've really thought it through.

One last little suggestion: For case (1), It may be easier to provide a C
language API to DFS than to create all that caching infrastructure, which
may have some other benefits, but you certainly know what your needs are.

And yes, if you have lots of little files, DFS isnt the place for them.

On 6/20/06, Michel Tourn (JIRA) <ji...@apache.org> wrote:
>
>    [
> http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12416978]
>
> Michel Tourn commented on HADOOP-288:
> -------------------------------------
>
> Yes, staying within the context of DFS could be simpler.
> Note however that we have these requirements:
> 1. archive files are sometimes used by non-Java non-Hadoop MapReduce
> programs (using http://wiki.apache.org/lucene-
> hadoop/HadoopStreaming)
> 2. avoid repetitive expansion of the job jar and of other archives for
> each Task in the Job.
> 3. In case of many small files, avoid a per-file overhead for DFS and
> cache operations.
>
> Because of 1. the files must really be native OS files, not DFS files.
> For such general tools, the "common-denominator API" is only: the base
> directory for the cache.
>
> Today, unarchiving the job jar occurs in Hadoop, not in the MapRed
> application. But it is not cached.
> Because of 2. and 3. the unarchiving process itself must be cacheable.
> So unarchiving must occur in the Hadoop framework, not the MapRed
> application.
>
>
> > RFC: Efficient file caching
> > ---------------------------
> >
> >          Key: HADOOP-288
> >          URL: http://issues.apache.org/jira/browse/HADOOP-288
> >      Project: Hadoop
> >         Type: Bug
>
> >     Reporter: Michel Tourn
> >     Assignee: Michel Tourn
>
> >
> > RFC: Efficient file caching
> > (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> > ------------------------------------------------------
> > We will start implementing this soon. Please provide feedback and
> improvements to this plan.
> > The header "Options:" indicates places where simple choices must be
> made.
> > Problem:
> > -------
> > o MapReduce tasks require access to additional out-of-band data
> ("dictionaries")
> > This out-of-band data is:
> > o in addition to the map/reduce inputs.
> > o large (1GB+)
> > o broadcast (same data is required on all the Task nodes)
> > o changes "infrequently", in particular:
> > oo it is always constant for all the Tasks in a Job.
> > oo it is often constant for a month at a time
> > oo it may be shared across team members
> > o sometimes used by pure-Java MapReduce programs
> > o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> > o (future) used by programs that use HDFS and Task-trackers but not
> MapReduce.
> > Existing Solutions to the problem:
> > ---------------------------------
> > These solutions are not good enough. The present proposal is to do Sol 1
> with caching.
> > Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job
> jar file.
> > Sol 2: Non  Hadoop: for each task node run rsync from single source for
> data.
> > Sol 3: Non  Hadoop: use BitTorrent, etc.
> > Sol.1 is correct but slow for many reasons:
> >  The Job submitter must recreate a large jar(tar) file for every Job.
> >   (The jar contains both changing programs and stable dictionaries)
> >  The large Jar file must be propagated from the client to HDFS with
> >  a large replication factor.
> >  At the beginning of every Task, the Task tracker gets the job jar from
> HDFS
> >  and unjars it in the working directory. This can dominate task
> execution time.
> >
> > Sol.2 has nice properties but also some problems.
> >  It does not scale well with large clusters (many concurrent rsync read
> requests i.e. single-source broadcast)
> >  It assumes that Hadoop users can upload data using rsync to the cluster
> nodes. As a policy, this is not allowed.
> >  It requires rsync.
> >
> > Sol.3 alleviates the rsync scalability problems but
> >       It is a dependency on an external system.
> >       We want something simpler and more tightly integrated with Hadoop.
> >
> > Staging (uploading) out-of-band data:
> > ------------------------------------
> > The out-of-band data will often originate on the local filesystem of a
> user machine
> >  (i.e. a MapReduce job submitter)
> > Nevertheless it makes sense to use HDFS to store the original
> out-of-band data because:
> > o HDFS has (wide) replication. This enables scalable broadcast later.
> > o HDFS is an available channel to move data from clients to all task
> machines.
> > o HDFS is convenient as a shared location among Hadoop team members.
> > Accessing (downloading) out-of-band data:
> > ----------------------------------------
> > The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> > Instead these programs just want to access out-of-band data as
> >  local files at predefined paths.
> > ([1] Existing programs should be reusable with no changes.
> >  This is often possible bec. communication is over stdin/stdout.)
> > Job's jar file as a special case:
> > --------------------------------
> > One use case is to allow users to make the job jar itself cachable.
> > This is only useful in cases where NOTHING changes when a job is
> resubmitted
> >  (no MapRed code changes and no changes in shipped data)
> > This situation might occur with an 'extractor' job (gets data from an
> external source: like Nutch crawler)
> > Currently the Hadoop mapred-jar mechanism works in this way:
> >  the job jar data is unjarred in the "working directory" of the Task
> >  the jar contains both MapRed java code (added to classpath)
> > Cache synchronization:
> > ---------------------
> > The efficient implementation of the out-of-band data distribution
> > is mostly a cache synchronization problem.
> > A list of the various aspects where choices must be made follows.
> > Cache key:
> > ---------
> > How do you test that the cached copy is out-of-date?
> > Options:
> > 1. the archive/file timestamp
> > 2. the MD5 of the archive/file content
> > Comparing source and destination Timestamps is problematic bec. it
> assumes synchronized clocks.
> > Also there is no last-modif metadata in HDFS (for good reasons, like
> scalability of metadata ops)
> > Timestamps stored with the source ('last-propagate-time') do
> >  not require synchronized clocks, only locally monotonic time.
> > (and the worse which can happen at daylight-savings switch is a missed
> update or an extra-update)
> > The cache code could store a copy of the local timestamp
> > in the same way that it caches the value of the content hash along with
> the source data.
> >
> > Cachable unit:
> > -------------
> > Options: individual files or archives or both.
> > Note:
> > At the API level, directories will be processed recursively
> > (and the local FS directories will parallel HDFS directories)
> > So bulk operations are always possible using directories.
> > The question here is whether to handle archives as an additional bulk
> mechanism.
> > Archives are special because:
> > o unarchiving occurs transparently as part of the cache sync
> > o The cache key is computed on the archive and preserved although
> >   the archive itself is not preserved.
> > Supported archive format will be: tar (maybe tgz or compressed jar)
> > Archive detection test: by filename extension ".tar" or ".jar"
> > Suppose we don't handle archives as special files:
> > Pros:
> >  o less code, no discussion about which archive formats are supported
> >  o fine for large dictionary files. And when files are not large, user
> may as well
> >    put them in the Job jar as usual.
> >  o user code could always check and unarchive specific cached files
> >    (as a side-effect of MapRed task initialization)
> > Cons:
> >  o handling small files may be inefficient
> >   (multiple HDFS operations, multiple hash computation,
> >    one 'metadata' hash file along with each small file)
> >  o It will not be possible to handle the Job's jar file as a special
> case of caching
> > Cache isolation:
> > ---------------
> > In some cases it may be a problem if the cached HDFS files are updated
> while a Job is in progress:
> > The file may become unavailable for a short period of time and some
> tasks fail.
> > The file may change (atomically) and different tasks use a different
> version.
> > This isolation problem is not addressed in this proposal.
> > Standard solutions to the isolation problem are:
> > o Assume that Jobs and interfering cache updates won't occur
> concurrently.
> > o Put a version number in the HDFS file paths and refer to a hard-coded
> version in the Job code.
> > o Before running the MapRed job, run a non-distributed application that
> tests
> >   what is the latest available version of the out-of-band data.
> >   Then make this version available to the MapRed job.
> >   Two ways to do this.
> >   o either set a job property just-in-time:
> >     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest");
> >     (see Job Configuration for meaning of this)
> >   o or publish the decision as an HDFS file containing the version.
> >     then rely on user code to read the version, and manually populate
> the cache:
> >     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
> >     (see MapReduce API for meaning of this)
> > Cache synchronization stages:
> > ----------------------------
> > There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> > o Client-to-HDFS stage.
> > Options: A simple option is to not do anything here, i.e. rely on the
> user.
> > This is a reasonable option given previous remarks on the role of HDFS:
> >  HDFS is a staging/publishing area and a natural shared location.
> > In particular this means that the system need not track
> > where the client files come from.
> > o HDFS-to-TaskTracker:
> > Client-to-HDFS synchronization (if done at all) should happen before
> this.
> > Then HDFS-to-TaskTracker synchronization must happen right before
> > the data is needed on a node.
> > MapReduce cache API:
> > -------------------
> > Options:
> > 1. No change in MapReduce framework code:
> > require the user to put this logic in map() (or reduce) function:
> >  in MyMapper constructor (or in map() on first record) user is asked to
> add:
> >
> >     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
> >     Cache.syncCache("..."); //etc.
> >
> > -----
> > 2. Put this logic in MapReduce framework and use Job properties to
> >    communicate the list of pairs (hdfs path; local path)
> >
> > Directories are processed recursively.
> > If archives are treated specially then they are unarchived on
> destination.
> >
> > MapReduce Job Configuration:
> > ---------------------------
> > Options:
> > with No change in MapReduce framework code (see above)
> >  no special Job configuration:
> >    it is up to the MapRed writer to configure and run the cache
> operations.
> > ---
> > with Logic in MapReduce framework (see above)
> >  some simple Job configuration
> > JobConf.addCachePathPair(String, String)
> > JobConf.addCachePathPair("/hdfs/path/fileordir",
> "relative/local/fileordir");
>
> --
> This message is automatically generated by JIRA.
> -
> If you think it was sent incorrectly contact one of the administrators:
>   http://issues.apache.org/jira/secure/Administrators.jspa
> -
> For more information on JIRA, see:
>   http://www.atlassian.com/software/jira
>
>

[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12433459 ] 
            
Doug Cutting commented on HADOOP-288:
-------------------------------------

+1  That sounds great!

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching-3.patch, caching-4.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12432963 ] 
            
Doug Cutting commented on HADOOP-288:
-------------------------------------

If we are using URIs then shouldn't the parameters be java.net.URI?

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching-3.patch, caching-4.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12432993 ] 
            
Mahadev konar commented on HADOOP-288:
--------------------------------------

sorry to have caused confusion. The files are specified as dfs://hostname:port/pathtofile. These are later converted to URI's by the framework to get the dfs and absolute paths. This is all similar to distcp where the input and outfiles can be specified as dfs://......

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching-3.patch, caching-4.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Assigned: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Mahadev konar reassigned HADOOP-288:
------------------------------------

    Assign To: Mahadev konar  (was: Michel Tourn)

> RFC: Efficient file caching
> ---------------------------
>
>          Key: HADOOP-288
>          URL: http://issues.apache.org/jira/browse/HADOOP-288
>      Project: Hadoop
>         Type: Bug

>     Reporter: Michel Tourn
>     Assignee: Mahadev konar

>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Resolved: (HADOOP-288) RFC: Efficient file caching

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Doug Cutting resolved HADOOP-288.
---------------------------------

    Fix Version/s: 0.7.0
       Resolution: Fixed

I just committed this.  Thanks, Mahadev!

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>             Fix For: 0.7.0
>
>         Attachments: caching-3.patch, caching-4.patch, caching-5.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Updated: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Mahadev konar updated HADOOP-288:
---------------------------------

    Attachment: caching.patch
                test.zip
                test.jar

I have attached two other files to the patch which are small .jar and .zip files needed for the junit tests.

Caching and job.jars:

Two parts to the patch:
1) Unjarring job.jar once for a job
2) Archiving archives/files locally 

1) Unjarring of job.jar
Currently the job.jar is unjarred for each task. This patch makes the framework do the unjarring only once for the job. The current working directory for each task if the same directory where the job is unjarred once. 
So the directory structure now looks like:

tasktracker/jobcache/jod_id/workdir -- the dir where the job is unjarred once
----------------------------/job_id/task_id/task_specific_job.xml

The current working dir for each task is the workdir.

2) Archiving of files- 

i) Each job can ask for a set of archives/files to be localized. The api for that is 
 jobconf.setCacheArchives(comma seperated list of archives)
 or 
 jobconf.setCacheFiles(comma seperated list of files).
 The comma seperated list can be specified as absolute path to files/caches (eg. /user/mahadev/test.jar) if they are in the same dfs as the mapred is running on or else they can be specified using urls as in copyfiles ( dfs://hostname:port/path_to_cache )
 There are two apis provided so that users who do not want their archives to be unarchived by the framework or just want to localize a file should use the second api.

ii) These archives/files should be present in the specified DFS for localizing.
    The user makes sure that these archives are present in the DFS before he submits the job else an error will be thrown that these archives are not present in DFS.

iii) Localization happens across jobs. So each cache archive/file has a key and the key is the url of the cache (in case of absolute path its the absolute path) 

iv) Whenever a job is started, the first tasks for these jobs will localize the archives. 
    The archives are stored in mapred.local/tasktracker/archives/hostname_of_dfs/dfs path of the archive.
    So an archive called /user/mahadev/k.zip  on a dfs running on machine1 would be unarchived in  
    dir =  mapred.local/tasktracker/archives/machine1/user/mahadev/k.zip/
    This dir contains the unarchived contents of k.zip.
    If it is just a file (/user/mahadev/test.txt and not an archive, then it is stored in a directory called 
    mapred.local/tasktracker/archives/machine1/user/mahadev/test.txt/test.txt
    the local directory name contains test.txt directory just to make it similar to the archive structure.

   if no dfs://hostname:port is specified (eg : setcachefiles(/user/mahadev/test.txt)), in that case it is stored in 
  mapred.local/tasktracker/archives/hostname_of_dfs_usedby_mapred/user/mahadev/test.txt

v) The archives are localized only once and checked for each task if they are fresh and need to be refresed or not.
   This is done using md5 checksum of the .crc files for the archives.
   
   Steps:
    a) When a job is submitted, the md5 checksums of the required archives/files in dfs are calculated and are written into the 
       jobconf.
    b) when a task is executing, it matches this md5 to the md5 of the localized cache (stored in memory after it has been localized). If they match its fine to go ahead with this archive.
       If it does not match then the md5 of the .crc of the file in dfs is calculated. If this does not match then the archives have been changed since the job has been submitted, so the tasks fail with this error. If they do match then the cache is refreshed again. 

    c) Two jobs can use the same archives in parallel, but if the second job updates the same archive and tries using the updated archive, then it will fail.
    
vi) How to get the localized cache paths
   An api in the jobconf called jobconf.getLocallizedCacheArchives gives a comma seperated list of localized path of the archives in the same order they had been asked to be localized.
  Also, you can use names for archives. So you could do something like:
 setcachearchives(x=somearchive)
and in the maps/reduces do conf.getNamedCache(x) and it will return the localized path of the cache named x.
vii) Restrictions: 
  Currently only *.zip and *.jar are only supported for archives. 

viii) Also, caching across tasktracker going up and down is not supported. So a tasktracker would lose all caching information once it goes down. The caching information can be reconstructed when the task tracker comes up but the support is not available in this patch.

ix) When are the caches deleted?
    A soft limit on the cache directory is a configuration parameter in the hadoop-default set to 10GB. So whenever the cache directory size goes beyond this size the framework will try deleting local caches that are not being used.





> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "paul sutter (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12416826 ] 

paul sutter commented on HADOOP-288:
------------------------------------


Why not 
- leave the "archived" data in DFS,
- with its replication level set to infinite, and
- make a change to the DFS client so that it will replicate blocks of such files locally when they are accessed, and
- ensure that blocks that are local are accessed through the local file system instead of through DFS

Wouldnt that be simpler than having a whole new mechanism?



> RFC: Efficient file caching
> ---------------------------
>
>          Key: HADOOP-288
>          URL: http://issues.apache.org/jira/browse/HADOOP-288
>      Project: Hadoop
>         Type: Bug

>     Reporter: Michel Tourn
>     Assignee: Michel Tourn

>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12431689 ] 
            
Owen O'Malley commented on HADOOP-288:
--------------------------------------

Jira ate my comments again. Since I didn't realize it for a day, I'll try to reconstruct them.

They were mostly nits:

1. You add 3 new mini-cluster bring up/tear down cycles in the junit tests. It would be faster to use the same cluster with multiple jobs.
2. The fields in DistributedCache should be private instead of package local.
3. The same for the string constants in TaskTracker.
4. TaskTracker.getCacheSubdir and getJobCacheSubdir should return Path's and be package local. 
5. getJobCacheSubdir should have a String jobId as a parameter and the result should be customized for that job. Furthermore, the result should be pushed through getLocalPath so that they are spread between the local dirs. Therefore, it should be given the server's conf also.
6. runningJobs should be declared as "Map runningJobs;" instead of TreeMap. You might also consider using "Map<String, RunningJob> runningJobs;"
7. appendString should probably be promoted into StringUtils as join(String[], String).
8. the catch after the calls to launchTaskForJob should probably be rolled into the body of the method rather  than repeated.
9. MapTask.java just has space changes.
10. justPaths is public and probably should be package local (and get some java doc). 
11 getCacheArchives is kind of confusing as a name since it gets both archives and files. It should also probably not be public.

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Updated: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Mahadev konar updated HADOOP-288:
---------------------------------

    Attachment: caching-4.patch

indented the patch to make it 2 spaces.

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching-3.patch, caching-4.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Michel Tourn (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12416824 ] 

Michel Tourn commented on HADOOP-288:
-------------------------------------

Hadoop proposal: file caching
updated description with more details.
----------

Efficient file caching 
(on Hadoop Task nodes, for benefit of MapReduce Tasks)

Overview
========
This document describes a mechanism for caching files or archives on taskTracker nodes' local file system.
Exporting and extracting large archives from HDFS to local filesystem is expensive.
And is often required by the applications.
Currently this would happen at the beginning of every Task of a MapReduce Job.

The purpose of the Hadoop file cache is to minimize this overhead by
preserving and reusing the extracted data

During a MapRed job there are two kinds of data being uploaded to a Hadoop cluster:
  Java code and Out-of-band data.

Java code may include libraries so this can easily get large. (megabytes)

Out-of-band data is any data used by the job, in addition to the map input or the reduce input.
For example a large dictionary of words. This can also get large (gigabytes)


There are two main kinds of cacheable files:
1. The MapReduce job jar. 
   This contains Java code and possibly out-of-band data.
2. Additional archives
   This contains out-of-band data.

The proposed solution suggests that
Cacheable files:
are stored in HDFS, and specified in the JobConf of a MapReduce job.
A special case is the job jar file, which will get cached by default.

Supported formats for cacheable files are jar, tar and gzip, 
Additional formats could be added at a future time.
Regular files are also supported


Workflow:
========
For out-of-band data, the user must first explicitly upload archives to HDFS.
This can be done using any HDFS client.
It is typical for out-of-band data to be reused across Jobs and users.

The user specifies the out-of-band data using:
JobConf.addCachedArchive() or JobConf.addCachedFile()

The user specifies the job jar as today:
JobConf.setJar("f.jar") which implicitly has the effect of:
JobConf.addCachedArchive("f.jar"). 

When a Job starts, the JobTracker does the following for each cached archive.
Compute a strong hash of the archive and store the hash in the HDFS.
To avoid reading and scanning the archive, the strong hash is based
on the existing HDFS block-CRC codes rather than on the actual content.

When a Task starts, the TaskTracker does the following for each cached archive.
Retrieve the strong hash from HDFS, compare with the hash of the local copy.
If the local hash does not exist or is different, then
  retrieve the archive, unarchive it, update the local hash.
If the archive is the job jar, then
  copy or hard-link the archive contents to the Task working directory.
Then start the TaskRunner as usual.

Once the Task is running, the user code may access the cached archive contents.
This usually happens at initialization time.
If the JobConf added the cached archive: /hdfsdir/path/f.jar
Then the task can expect to access the archive content at:
$HADOOP_CACHE/hdfsdir/path/fdir/ffile 
or maybe:
$HADOOP_CACHE/hdfsdir/path/f_jar/fdir/ffile
The second option guarantees that multiple archives 
in the same directory will not clobber each other.
The translation of f.jar to f_jar is a convention to ease the distinction of file names and directory names.


Note that in the above, the HDFS paths are mirrored on the local filesystem.
The intent is to provide namespace protection.
[i.e. the contents of hdfs1/archive.jar and hdfs2/archive.jar should not collide in the cache]
The intent is not to make cache paths interchangeable with HDFS paths. 


The variable HADOOP_CACHE is made available to the task as
a JobConf property that is dynamically set by the TaskRunner code.

Cache size control:
------------------
We cannot let the cache grow unbounded.

The cache is always up-to-date at the start of a job.
So the configurable parameter should not be the age of the cached data 
but the total size of the cache. 
The cache size is a static TaskTracker configuration parameter.

LRU (least recently used) policy:
On each Task tracker, the cache manager will measure the total size of the cache
and expire the oldest cached items. 
When a cached item is requested again in a different job, it goes back to the top.

The cached archive contents are required for the MapReduce task to function.
So when the promised cache contents cannot be provided, 
the cache manager will force a job failure 

Before new files are added to the cache, we do this size test.
If the cache size limit WOULD require to expire files...
1. .. expire files for completed jobs then everything is fine: delete them.
2. .. expire files for jobs that are already running, then the NEW job fails.
3. .. expire files for the new job then the new job fails.

Note that a file (archive) may belong to multiple jobs.

In normal use the cache size is expected to be significantly larger 
than the files requested by a single job. 
So the failure modes due to cache overflow should rarely occur.

THE END.


> RFC: Efficient file caching
> ---------------------------
>
>          Key: HADOOP-288
>          URL: http://issues.apache.org/jira/browse/HADOOP-288
>      Project: Hadoop
>         Type: Bug

>     Reporter: Michel Tourn
>     Assignee: Michel Tourn

>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "eric baldeschwieler (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12417955 ] 

eric baldeschwieler commented on HADOOP-288:
--------------------------------------------

Option #3 seems like the simplest to implement, maintain and explain.  
Would this blow anything up in nutch?  

We could of course provide a flag that causes the backwards compatible copy for a release or two if really needed.

> RFC: Efficient file caching
> ---------------------------
>
>          Key: HADOOP-288
>          URL: http://issues.apache.org/jira/browse/HADOOP-288
>      Project: Hadoop
>         Type: Bug

>     Reporter: Michel Tourn
>     Assignee: Michel Tourn

>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12432634 ] 
            
Doug Cutting commented on HADOOP-288:
-------------------------------------

The indentation of this patch is non-standard.  Please use 2 spaces per indent level, no tabs.

Should the JobConf setters be adders?  For example, should setCacheFiles(String) instead be named addCacheFile(Path)?  Also, should we use paths instead of strings?

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching-3.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Updated: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Mahadev konar updated HADOOP-288:
---------------------------------

    Attachment: caching-5.patch

Made the suggested changes by doug. I moved all the public methods in jobconf for caching into DistributedCache. Mapreduce is now a client of DistributedCache class. Also, the public api now uses URI to addcachefiles/archives. DistributedCache is seperate package.

So the api now looks like 
DistrbutedCache.addCacheArchives(uri, jobconf)

and to get the localized Paths 
DistibutedCache.getLocalArchives(Configuration conf) gives an array of  localized path directory of cache archives in the order they were added.  


> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching-3.patch, caching-4.patch, caching-5.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Updated: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Mahadev konar updated HADOOP-288:
---------------------------------

    Attachment: caching.patch

Incorporated doug's suggestions in this new patch.

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Michel Tourn (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12417918 ] 

Michel Tourn commented on HADOOP-288:
-------------------------------------

A problematic choice must be made to implement the following.
--
Currently without caching: in Hadoop the Task working directory contains the expanded contents of the job jar. 
Later with caching: the Task working directory contents should be efficiently created from the filecache contents.
--
So how to synchronize the task working directory job jar data with the file cache?
Or how to work around the need to do this?

Some options follow, all have problems. 
Which one is best?
---------------------------------------
Option 1. Symbolic links
This includes Symbolic links to jar files.
It is probably brittle to have classpath elements as symbolic links.
But cross-platform support is a little easier than hard links (next)
--
Option 2. Hard links
Problematic for cross-platform support. Both NTFS and UFS can create hard-linked files.
At best it works on all platforms but requires launching native/cygwin tools.
And possibly cygwin is not good enough to handle hard links on NTFS.
--
Option 2.5 File copy as a fallback mechanism for Option 2.
Problem is that this is slower and partially defeats the purpose of caching.
--
Option 3. 
change the MapReduce Job jar location.
The current convention is "working directory" contains MapReduce code jar and resources.
The modified convention would be that this directory is a parameter (pointing to a specific directory in the archive cache area)
 This new parameter could be exposed to the MapRed job as JobConf param, as a system property or as environment variable.
Upside: pure java. 
Downside: not fully backward compatible
--


> RFC: Efficient file caching
> ---------------------------
>
>          Key: HADOOP-288
>          URL: http://issues.apache.org/jira/browse/HADOOP-288
>      Project: Hadoop
>         Type: Bug

>     Reporter: Michel Tourn
>     Assignee: Michel Tourn

>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12433450 ] 
            
Mahadev konar commented on HADOOP-288:
--------------------------------------


ok, this is what I plan to do. Doug, please comment if you are ok with this -- 

1) DistributedCache class will be seperate package - org.apache.hadoop.filecache;

2) To make it idependent of mapreduce -- 
DistributedCache will have methods -- 
Path localizeCacheArchives(URI of archive to localize, conf)
-- this will return the Path of the localized archive directory
Path localizeCacheFiles(URI of file to localize, conf)
-- this will return the Path of the localized file

The difference between these two methods being that localizeCacheArchives automatically unarchives a zip/jar file while localizeCacheFiles just copies the file locally.

3) DistributedCache maintains a list of localized files/caches so as to copy only once.

4) The TaskRunner is client of DistributedCache asking to cache files locally.

5) JobConf does not have any api's related to setCache/addCache. These will be seperate static methods in DistributedCache to setCaches/files in jobconf. The getCaches can also be impelmented as a static method in DistributedCache.

Comments?

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching-3.patch, caching-4.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Updated: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Mahadev konar updated HADOOP-288:
---------------------------------

               Status: Patch Available  (was: Open)
    Affects Version/s: 0.6.0

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12430288 ] 
            
Mahadev konar commented on HADOOP-288:
--------------------------------------

You are right Doug. The only public api's should be the jobconf methods. I will incorporate your changes and resubmit the patch. Thanks for your comments.

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12430084 ] 
            
Doug Cutting commented on HADOOP-288:
-------------------------------------

The LocalDU class should rather be a method on FileUtils, as should the UnZip utility.

DistributedCache should not be public, nor should the methods you add to TaskTracker.  The only public API for this new feature is the two new JobConf methods, right?



> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Updated: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Mahadev konar updated HADOOP-288:
---------------------------------

    Attachment: caching-3.patch

incorporated most of owen's comments, except that of merging all my junit tests into other junit tests. I merged my local/minimr junit test into TestMiniMRLocalFS.  I tried including my caching junit test with DFS and MiniMR in TestMiniMRWIthDFS but that made it longer htan 3 mins. It currently takes arnd 140 seconds. So I have a seperate junit test for DFS and MiniMR.

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching-3.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Mahadev konar (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12432685 ] 
            
Mahadev konar commented on HADOOP-288:
--------------------------------------

about setcacheFiles(), we could add a addcacheFIles though I have not done it in the patch. Also, we are using URI's (dfs://hostname:port/path) so I dont think we should be using paths instead of strings.

> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching-3.patch, caching-4.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Updated: (HADOOP-288) RFC: Efficient file caching

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Doug Cutting updated HADOOP-288:
--------------------------------

    Status: Open  (was: Patch Available)

The documentation of the primary user API should fully describe the format of the parameters, e.g., that these are strings representing URIs.  This would be simpler if the API used a type other than String.

We need an addCacheFile() method before we need a setCacheFiles() method, not the other way around.

I'd like to see this as independent of the mapred core as is possible, in order to support things like HADOOP-452.  In particular, the DistributedCache class should probably move to a separate package (filecache)?, and as much of the functionality as is possible should be put in that package.  The TaskTracker and JobTracker should become clients of this facility.

Utilities to store a set of files to cache in a Configuration should arguably move to the new package as well.  There's a temptation to overload JobConf that we need to resist.  So I'd rather see these as static methods like DistributedCache.addFile(Configuration, URI).


> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching-3.patch, caching-4.patch, caching.patch, caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Commented: (HADOOP-288) RFC: Efficient file caching

Posted by "Michel Tourn (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12416978 ] 

Michel Tourn commented on HADOOP-288:
-------------------------------------

Yes, staying within the context of DFS could be simpler. 
Note however that we have these requirements:
1. archive files are sometimes used by non-Java non-Hadoop MapReduce programs (using http://wiki.apache.org/lucene-
hadoop/HadoopStreaming) 
2. avoid repetitive expansion of the job jar and of other archives for each Task in the Job.
3. In case of many small files, avoid a per-file overhead for DFS and cache operations.

Because of 1. the files must really be native OS files, not DFS files. 
For such general tools, the "common-denominator API" is only: the base directory for the cache.

Today, unarchiving the job jar occurs in Hadoop, not in the MapRed application. But it is not cached.
Because of 2. and 3. the unarchiving process itself must be cacheable.
So unarchiving must occur in the Hadoop framework, not the MapRed application.


> RFC: Efficient file caching
> ---------------------------
>
>          Key: HADOOP-288
>          URL: http://issues.apache.org/jira/browse/HADOOP-288
>      Project: Hadoop
>         Type: Bug

>     Reporter: Michel Tourn
>     Assignee: Michel Tourn

>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress:
> The file may become unavailable for a short period of time and some tasks fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira