You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by YouPeng Yang <yy...@gmail.com> on 2013/05/07 10:18:01 UTC
DistributedCache does not seem to copy the HDFS files to local
Hi All
I want to use the DistributedCache to perform replicated join on the
map side.
My java code refer to [1][2].
When I run the job,the file that I want to cache in the local dir of my
DN is not to copied.So the FileNotFoundException error came out[3]
And I checkout the source code of DistributedCache.java as following[4].
It shows that it just sets up the conf ,however no copy phase to cache
the file to local dir.
My question are:
1. how could I to access the DistributedCache files?
2.Does the DistributedCache do the copy work to copy the hdfs file to
the local dir?
[1] My driver class:
job = new Job(conf,"join work");
DistributedCache.addCacheFile(new
URI("hdfs://Hadoop01:8040/user/hadoop/sqoop/CMTSIFTABLE/part-m-00000"),
job.getConfiguration());
[2] My the setup function of Mapper class:
@Override
public void setup(Context context){
try {
Path[] cacheFiles =
DistributedCache.getLocalCacheFiles(context.getConfiguration());
// URI[] cacheFiles =
DistributedCache.getCacheFiles(context.getConfiguration());
//System.out.print("-----cacheFiles-----"+cacheFiles.length);
if (cacheFiles != null && cacheFiles.length > 0){
String line ;
String[] tokens ;
BufferedReader joinReader = new BufferedReader(new
FileReader(cacheFiles[0].toString()));
while ((line = joinReader.readLine()) != null ) {
tokens = StringUtils.split(line,'|');
if (tokens != null && tokens.length >= 3){
joinData.put(tokens[0], tokens[1]);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
[3] the FileNotFoundException
java.io.FileNotFoundException:
file:/tmp/hadoop-hadoop/mapred/local/4990846200525419138/part-m-00000 (No
such file or directory) at java.io.FileInputStream.open(Native Method) at
java.io.FileInputStream.<init>(FileInputStream.java:120) at
java.io.FileInputStream.<init>(FileInputStream.java:79) at
java.io.FileReader.<init>(FileReader.java:41) at
com.jhetl.hadoop.map.CMTS.IFSNMapper.setup(IFSNMapper.java:46) at
org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142) at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:725) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) at
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:232)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at
java.util.concurrent.FutureTask.run(FutureTask.java:138) at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
[4]source of DistributedCache
/**
* Add a file to be localized to the conf. Intended
* to be used by user code.
* @param uri The uri of the cache to be localized
* @param conf Configuration to add the cache to
* @deprecated Use {@link Job#addCacheFile(URI)} instead
*/
@Deprecated
public static void addCacheFile(URI uri, Configuration conf) {
String files = conf.get(MRJobConfig.CACHE_FILES);
conf.set(MRJobConfig.CACHE_FILES, files == null ? uri.toString() :
files + ","
+ uri.toString());
}