You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Shi Yu <sh...@uchicago.edu> on 2011/12/15 23:39:15 UTC

DistributedCache in NewAPI on 0.20.X branch

Hi,

I am using 0.20.X branch.  However, I need to use the new API because it 
has the cleanup(context) method in Mapper.  However, I am confused about 
how to load the cached files in mapper.   I could load the 
DistributedCache files using old API (JobConf),  but in new API it 
always returns null.   I read some previous discussions that "on 0.20.X 
branch, calling DistributedCache using old API is encouraged."  My 
question is:  Is it possible to use DistributedCache using new API, or 
the only possible way is to upgrade to higher version?    Or I use a 
wrong method in my code?  Thanks!


The relevant code is something as follows:

class Map extends Mapper <K1, V1, K2, V2>{

public void setup(Mapper<K1,V1, K2, V2>.Context context) throws IOException{
   try{
    Path[] localFiles;
    localFiles = 
DistributedCache.getLocalCacheFiles(context.getConfiguration());
    String file1 = localFiles[0].toUri.getPath();   // return null?
     ....

   }


}

}

public int run(String[] args) throws IOException, URISyntaxException{
    ...
   //set the configuration

   Job job = new Job(conf,"Myjob");
   DistributeCache.addCacheFile(new URI(hdfs://server:9000/file1), 
job.getConfiguration);

}

BR,


Shi


Re: DistributedCache in NewAPI on 0.20.X branch

Posted by Shi Yu <sh...@uchicago.edu>.
Thank you Bejoy!

Following your code examples, it finally works. 

Actually I only changed two places in my original code.  First, 
I added the "Override" tag. Second, I added a new exception 
catch(FileNotFoundException e), and now it works!

I appreciate your kind and precise help. 

Best,

Shi

Re: DistributedCache in NewAPI on 0.20.X branch

Posted by Bejoy Ks <be...@gmail.com>.
Hi Shi
         My Bad, the syntax i posted last time was not the right one ,
sorry was from my hand held

@Override
public void setup(Context context)
{
            File file = new File("TestFile.txt");
.
.
.
}

I didn't get a chance to debug your code, but if you are looking for a
working example of Distributed Cache using the new API please find the
files below

DistCacheTest.java - http://pastebin.com/PkdXrDgc
DistCacheTestMapper.java - http://pastebin.com/EcE3kEQW

I had a working sample with me, just pasted your logic in there and tested
it on my cluster. I was working good for me.

Regards
Bejoy.K.S

On Sat, Dec 17, 2011 at 3:16 AM, Shi Yu <sh...@uchicago.edu> wrote:

> Follow my previous question, I put the complete code as
> follows, I doubt is there any method to get this working on
> 0.20.X using the new API.
>
> The command I executed was:
>
> bin/hadoop jar myjar.jar FileTest -files textFile.txt /input/
> /output/
>
> The complete code:
>
> public class FileTest extends Configured implements Tool {
>       private static final Logger sLogger =
> Logger.getLogger(FileTest.class);
>
>       public static class Map extends
> org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text,
> Text>{
>           Text word;
>
>           public void
> setup(org.apache.hadoop.mapreduce.Mapper<LongWritable, Text,
> Text, Text>.Context context){
>               String line;
>               try{
>                   File z1_file = new File("textFile.txt");
>                   BufferedReader bf = new BufferedReader(new
> FileReader(z1_file));
>                   while ((line=bf.readLine())!=null){
>
>                       word = new Text(line);
>                   }
>
>               } catch(IOException ioe){
>                   sLogger.error(ioe.toString());
>               }
>           }
>
>            public void map(LongWritable key, Text value,
> org.apache.hadoop.mapreduce.Mapper.Context context) throws
> IOException, InterruptedException{
>                     context.write(new Text("test"), word);
>                 }
>       }
>
>
>       public int run(String[] args) throws
> IOException,URISyntaxException {
>             GenericOptionsParser parser = new
> GenericOptionsParser(args);
>            Configuration conf = parser.getConfiguration();
>            String[] otherArgs = parser.getRemainingArgs();
>            Job job = new Job(conf, "MyJob");
>
>            Path in = new Path(otherArgs[0]);
>            Path out = new Path(otherArgs[1]);
>
>
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInput
> Paths(job, in);
>
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOut
> putPath(job, out);
>
>            job.setJarByClass(FileTest.class);
>            job.setMapperClass(FileTest.Map.class);
>
>            job.setNumReduceTasks(0);
>            job.setMapOutputKeyClass(Text.class);
>            job.setMapOutputValueClass(Text.class);
>
>            try {
>                System.exit(job.waitForCompletion(true) ? 0 :
> 1);
>                 return 0;
>            }catch( Throwable e) {
>                  sLogger.error( "Job failed ", e);
>                 return -1;
>            }
>       }
>
>       public static void main(String[] args) throws Exception
> {
>            int exitCode = ToolRunner.run(new
> FileTest(),args);
>            System.exit(exitCode);
>       }
>
> }
>

Re: DistributedCache in NewAPI on 0.20.X branch

Posted by Shi Yu <sh...@uchicago.edu>.
Follow my previous question, I put the complete code as 
follows, I doubt is there any method to get this working on 
0.20.X using the new API.

The command I executed was: 

bin/hadoop jar myjar.jar FileTest -files textFile.txt /input/ 
/output/

The complete code: 

public class FileTest extends Configured implements Tool {
       private static final Logger sLogger = 
Logger.getLogger(FileTest.class);

       public static class Map extends 
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, 
Text>{
           Text word;

           public void 
setup(org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, 
Text, Text>.Context context){
               String line;
               try{
                   File z1_file = new File("textFile.txt");
                   BufferedReader bf = new BufferedReader(new 
FileReader(z1_file));
                   while ((line=bf.readLine())!=null){

                       word = new Text(line);
                   }

               } catch(IOException ioe){
                   sLogger.error(ioe.toString());
               }
           }

            public void map(LongWritable key, Text value, 
org.apache.hadoop.mapreduce.Mapper.Context context) throws 
IOException, InterruptedException{
                     context.write(new Text("test"), word);
                }
       }


       public int run(String[] args) throws 
IOException,URISyntaxException {
            GenericOptionsParser parser = new 
GenericOptionsParser(args);
            Configuration conf = parser.getConfiguration();
            String[] otherArgs = parser.getRemainingArgs();
            Job job = new Job(conf, "MyJob");

            Path in = new Path(otherArgs[0]);
            Path out = new Path(otherArgs[1]);

           
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInput
Paths(job, in);
           
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOut
putPath(job, out);

            job.setJarByClass(FileTest.class);
            job.setMapperClass(FileTest.Map.class);

            job.setNumReduceTasks(0);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            try {
                System.exit(job.waitForCompletion(true) ? 0 : 
1);
                 return 0;
            }catch( Throwable e) {
                  sLogger.error( "Job failed ", e);
                 return -1;
            }
       }

       public static void main(String[] args) throws Exception 
{
            int exitCode = ToolRunner.run(new 
FileTest(),args);
            System.exit(exitCode);
       }

}

Re: DistributedCache in NewAPI on 0.20.X branch

Posted by Shi Yu <sh...@uchicago.edu>.
Hi,

Thanks for the response. I am still confused because the first 
solution seems not working.  The second solution uses the   
Deprecated JobConf class.   Is there anyway to do it using the 
new API like the following code?  I tried simply replacing the 
Deprecated API with new API, not working.

public void setup(Context context)  {
    // How ?
}


Shi

Re: DistributedCache in NewAPI on 0.20.X branch

Posted by Bejoy Ks <be...@gmail.com>.
Hi Shi
         Try out the following,it could get things working
 Use DistributedCache.getCacheFiles() instead of
DistributedCache.getLocalCacheFiles()

public void setup(JobConf job)
{
            DistributedCache.getLocalCacheFiles(job)
.
.
.
}

If that also doesn't seem to work and if you have the file name in handy
which you populated on cache, use it the normal java way

job submission:
hadoop jar ............    -files /home/x/y/testFile.txt <input> <output>

in your setup() method, just retrieve the files like
public void setup(JobConf job)
{
       File file = new File("testFile.txt");
}


Regards
Bejoy.K.S

On Fri, Dec 16, 2011 at 4:09 AM, Shi Yu <sh...@uchicago.edu> wrote:

> Hi,
>
> I am using 0.20.X branch.  However, I need to use the new API because it
> has the cleanup(context) method in Mapper.  However, I am confused about
> how to load the cached files in mapper.   I could load the DistributedCache
> files using old API (JobConf),  but in new API it always returns null.   I
> read some previous discussions that "on 0.20.X branch, calling
> DistributedCache using old API is encouraged."  My question is:  Is it
> possible to use DistributedCache using new API, or the only possible way is
> to upgrade to higher version?    Or I use a wrong method in my code?
>  Thanks!
>
>
> The relevant code is something as follows:
>
> class Map extends Mapper <K1, V1, K2, V2>{
>
> public void setup(Mapper<K1,V1, K2, V2>.Context context) throws
> IOException{
>  try{
>   Path[] localFiles;
>   localFiles = DistributedCache.**getLocalCacheFiles(context.**
> getConfiguration());
>   String file1 = localFiles[0].toUri.getPath();   // return null?
>    ....
>
>  }
>
>
> }
>
> }
>
> public int run(String[] args) throws IOException, URISyntaxException{
>   ...
>  //set the configuration
>
>  Job job = new Job(conf,"Myjob");
>  DistributeCache.addCacheFile(**new URI(hdfs://server:9000/file1),
> job.getConfiguration);
>
> }
>
> BR,
>
>
> Shi
>
>