You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by YouPeng Yang <yy...@gmail.com> on 2013/05/07 10:18:01 UTC

DistributedCache does not seem to copy the HDFS files to local

Hi All

  I want to use the DistributedCache   to  perform replicated join on the
map side.
  My java code refer to [1][2].
  When I run the job,the file that I want to cache in the local dir of my
DN is not to copied.So the FileNotFoundException error came out[3]

  And I checkout the source code of DistributedCache.java as following[4].
 It shows that it just sets up the  conf ,however  no copy phase  to cache
the file to local dir.

  My question are:
 1. how could I to access the DistributedCache files?
 2.Does the DistributedCache    do the copy work to copy the hdfs file to
the local dir?



 [1] My driver class:
job = new Job(conf,"join work");
DistributedCache.addCacheFile(new
URI("hdfs://Hadoop01:8040/user/hadoop/sqoop/CMTSIFTABLE/part-m-00000"),
job.getConfiguration());

[2] My the setup function of  Mapper class:

@Override
public void setup(Context context){
try {
 Path[] cacheFiles =
DistributedCache.getLocalCacheFiles(context.getConfiguration());
// URI[] cacheFiles =
DistributedCache.getCacheFiles(context.getConfiguration());
 //System.out.print("-----cacheFiles-----"+cacheFiles.length);
 if (cacheFiles != null && cacheFiles.length > 0){
String line ;
String[] tokens ;
 BufferedReader joinReader = new BufferedReader(new
FileReader(cacheFiles[0].toString()));
while ((line = joinReader.readLine()) != null ) {
tokens = StringUtils.split(line,'|');
if (tokens != null && tokens.length >= 3){
joinData.put(tokens[0], tokens[1]);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

[3] the FileNotFoundException

java.io.FileNotFoundException:
file:/tmp/hadoop-hadoop/mapred/local/4990846200525419138/part-m-00000 (No
such file or directory) at java.io.FileInputStream.open(Native Method) at
java.io.FileInputStream.<init>(FileInputStream.java:120) at
java.io.FileInputStream.<init>(FileInputStream.java:79) at
java.io.FileReader.<init>(FileReader.java:41) at
com.jhetl.hadoop.map.CMTS.IFSNMapper.setup(IFSNMapper.java:46) at
org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142) at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:725) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) at
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:232)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at
java.util.concurrent.FutureTask.run(FutureTask.java:138) at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)



[4]source of DistributedCache
  /**
   * Add a file to be localized to the conf.  Intended
   * to be used by user code.
   * @param uri The uri of the cache to be localized
   * @param conf Configuration to add the cache to
   * @deprecated Use {@link Job#addCacheFile(URI)} instead
   */
  @Deprecated
  public static void addCacheFile(URI uri, Configuration conf) {
    String files = conf.get(MRJobConfig.CACHE_FILES);
    conf.set(MRJobConfig.CACHE_FILES, files == null ? uri.toString() :
files + ","
             + uri.toString());
  }