You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Shi Yu <sh...@uchicago.edu> on 2011/12/15 23:39:15 UTC
DistributedCache in NewAPI on 0.20.X branch
Hi,
I am using 0.20.X branch. However, I need to use the new API because it
has the cleanup(context) method in Mapper. However, I am confused about
how to load the cached files in mapper. I could load the
DistributedCache files using old API (JobConf), but in new API it
always returns null. I read some previous discussions that "on 0.20.X
branch, calling DistributedCache using old API is encouraged." My
question is: Is it possible to use DistributedCache using new API, or
the only possible way is to upgrade to higher version? Or I use a
wrong method in my code? Thanks!
The relevant code is something as follows:
class Map extends Mapper <K1, V1, K2, V2>{
public void setup(Mapper<K1,V1, K2, V2>.Context context) throws IOException{
try{
Path[] localFiles;
localFiles =
DistributedCache.getLocalCacheFiles(context.getConfiguration());
String file1 = localFiles[0].toUri.getPath(); // return null?
....
}
}
}
public int run(String[] args) throws IOException, URISyntaxException{
...
//set the configuration
Job job = new Job(conf,"Myjob");
DistributeCache.addCacheFile(new URI(hdfs://server:9000/file1),
job.getConfiguration);
}
BR,
Shi
Re: DistributedCache in NewAPI on 0.20.X
branch
Posted by Shi Yu <sh...@uchicago.edu>.
Thank you Bejoy!
Following your code examples, it finally works.
Actually I only changed two places in my original code. First,
I added the "Override" tag. Second, I added a new exception
catch(FileNotFoundException e), and now it works!
I appreciate your kind and precise help.
Best,
Shi
Re: DistributedCache in NewAPI on 0.20.X branch
Posted by Bejoy Ks <be...@gmail.com>.
Hi Shi
My Bad, the syntax i posted last time was not the right one ,
sorry was from my hand held
@Override
public void setup(Context context)
{
File file = new File("TestFile.txt");
.
.
.
}
I didn't get a chance to debug your code, but if you are looking for a
working example of Distributed Cache using the new API please find the
files below
DistCacheTest.java - http://pastebin.com/PkdXrDgc
DistCacheTestMapper.java - http://pastebin.com/EcE3kEQW
I had a working sample with me, just pasted your logic in there and tested
it on my cluster. I was working good for me.
Regards
Bejoy.K.S
On Sat, Dec 17, 2011 at 3:16 AM, Shi Yu <sh...@uchicago.edu> wrote:
> Follow my previous question, I put the complete code as
> follows, I doubt is there any method to get this working on
> 0.20.X using the new API.
>
> The command I executed was:
>
> bin/hadoop jar myjar.jar FileTest -files textFile.txt /input/
> /output/
>
> The complete code:
>
> public class FileTest extends Configured implements Tool {
> private static final Logger sLogger =
> Logger.getLogger(FileTest.class);
>
> public static class Map extends
> org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text,
> Text>{
> Text word;
>
> public void
> setup(org.apache.hadoop.mapreduce.Mapper<LongWritable, Text,
> Text, Text>.Context context){
> String line;
> try{
> File z1_file = new File("textFile.txt");
> BufferedReader bf = new BufferedReader(new
> FileReader(z1_file));
> while ((line=bf.readLine())!=null){
>
> word = new Text(line);
> }
>
> } catch(IOException ioe){
> sLogger.error(ioe.toString());
> }
> }
>
> public void map(LongWritable key, Text value,
> org.apache.hadoop.mapreduce.Mapper.Context context) throws
> IOException, InterruptedException{
> context.write(new Text("test"), word);
> }
> }
>
>
> public int run(String[] args) throws
> IOException,URISyntaxException {
> GenericOptionsParser parser = new
> GenericOptionsParser(args);
> Configuration conf = parser.getConfiguration();
> String[] otherArgs = parser.getRemainingArgs();
> Job job = new Job(conf, "MyJob");
>
> Path in = new Path(otherArgs[0]);
> Path out = new Path(otherArgs[1]);
>
>
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInput
> Paths(job, in);
>
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOut
> putPath(job, out);
>
> job.setJarByClass(FileTest.class);
> job.setMapperClass(FileTest.Map.class);
>
> job.setNumReduceTasks(0);
> job.setMapOutputKeyClass(Text.class);
> job.setMapOutputValueClass(Text.class);
>
> try {
> System.exit(job.waitForCompletion(true) ? 0 :
> 1);
> return 0;
> }catch( Throwable e) {
> sLogger.error( "Job failed ", e);
> return -1;
> }
> }
>
> public static void main(String[] args) throws Exception
> {
> int exitCode = ToolRunner.run(new
> FileTest(),args);
> System.exit(exitCode);
> }
>
> }
>
Re: DistributedCache in NewAPI on 0.20.X
branch
Posted by Shi Yu <sh...@uchicago.edu>.
Follow my previous question, I put the complete code as
follows, I doubt is there any method to get this working on
0.20.X using the new API.
The command I executed was:
bin/hadoop jar myjar.jar FileTest -files textFile.txt /input/
/output/
The complete code:
public class FileTest extends Configured implements Tool {
private static final Logger sLogger =
Logger.getLogger(FileTest.class);
public static class Map extends
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text,
Text>{
Text word;
public void
setup(org.apache.hadoop.mapreduce.Mapper<LongWritable, Text,
Text, Text>.Context context){
String line;
try{
File z1_file = new File("textFile.txt");
BufferedReader bf = new BufferedReader(new
FileReader(z1_file));
while ((line=bf.readLine())!=null){
word = new Text(line);
}
} catch(IOException ioe){
sLogger.error(ioe.toString());
}
}
public void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper.Context context) throws
IOException, InterruptedException{
context.write(new Text("test"), word);
}
}
public int run(String[] args) throws
IOException,URISyntaxException {
GenericOptionsParser parser = new
GenericOptionsParser(args);
Configuration conf = parser.getConfiguration();
String[] otherArgs = parser.getRemainingArgs();
Job job = new Job(conf, "MyJob");
Path in = new Path(otherArgs[0]);
Path out = new Path(otherArgs[1]);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInput
Paths(job, in);
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOut
putPath(job, out);
job.setJarByClass(FileTest.class);
job.setMapperClass(FileTest.Map.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
try {
System.exit(job.waitForCompletion(true) ? 0 :
1);
return 0;
}catch( Throwable e) {
sLogger.error( "Job failed ", e);
return -1;
}
}
public static void main(String[] args) throws Exception
{
int exitCode = ToolRunner.run(new
FileTest(),args);
System.exit(exitCode);
}
}
Re: DistributedCache in NewAPI on 0.20.X
branch
Posted by Shi Yu <sh...@uchicago.edu>.
Hi,
Thanks for the response. I am still confused because the first
solution seems not working. The second solution uses the
Deprecated JobConf class. Is there anyway to do it using the
new API like the following code? I tried simply replacing the
Deprecated API with new API, not working.
public void setup(Context context) {
// How ?
}
Shi
Re: DistributedCache in NewAPI on 0.20.X branch
Posted by Bejoy Ks <be...@gmail.com>.
Hi Shi
Try out the following,it could get things working
Use DistributedCache.getCacheFiles() instead of
DistributedCache.getLocalCacheFiles()
public void setup(JobConf job)
{
DistributedCache.getLocalCacheFiles(job)
.
.
.
}
If that also doesn't seem to work and if you have the file name in handy
which you populated on cache, use it the normal java way
job submission:
hadoop jar ............ -files /home/x/y/testFile.txt <input> <output>
in your setup() method, just retrieve the files like
public void setup(JobConf job)
{
File file = new File("testFile.txt");
}
Regards
Bejoy.K.S
On Fri, Dec 16, 2011 at 4:09 AM, Shi Yu <sh...@uchicago.edu> wrote:
> Hi,
>
> I am using 0.20.X branch. However, I need to use the new API because it
> has the cleanup(context) method in Mapper. However, I am confused about
> how to load the cached files in mapper. I could load the DistributedCache
> files using old API (JobConf), but in new API it always returns null. I
> read some previous discussions that "on 0.20.X branch, calling
> DistributedCache using old API is encouraged." My question is: Is it
> possible to use DistributedCache using new API, or the only possible way is
> to upgrade to higher version? Or I use a wrong method in my code?
> Thanks!
>
>
> The relevant code is something as follows:
>
> class Map extends Mapper <K1, V1, K2, V2>{
>
> public void setup(Mapper<K1,V1, K2, V2>.Context context) throws
> IOException{
> try{
> Path[] localFiles;
> localFiles = DistributedCache.**getLocalCacheFiles(context.**
> getConfiguration());
> String file1 = localFiles[0].toUri.getPath(); // return null?
> ....
>
> }
>
>
> }
>
> }
>
> public int run(String[] args) throws IOException, URISyntaxException{
> ...
> //set the configuration
>
> Job job = new Job(conf,"Myjob");
> DistributeCache.addCacheFile(**new URI(hdfs://server:9000/file1),
> job.getConfiguration);
>
> }
>
> BR,
>
>
> Shi
>
>