You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by 谭军 <ta...@163.com> on 2011/09/21 16:54:10 UTC

How do I set the intermediate output path when I use 2 mapreduce jobs?

Hi,
I want to use 2 MR jobs sequentially.
And the first job produces intermediate result to a temp file.
The second job reads the result in temp file but not the FileInputPath.
I tried, but FileNotFoundException reported.
Then I checked the datanodes, temp file was created.
The first job was executed correctly.
Why the second job cannot find the file? The file was created before the second job was executed.
Thanks!


--


Regards!

Jun Tan

Re: Re: Re: Re: How do I set the intermediate output path when I use 2 mapreduce jobs?

Posted by Swathi V <sw...@zinniasystems.com>.
Hi Jun Tun,

Yeah ! surely...
Well the code i gave is the new API.

2011/9/24 谭军 <ta...@163.com>

> Hi Swathi.V.,
> Thank you very much.
> It's very kind of you to do that.
> I think the code you gave is implemented in old APIs.
> I made it several days ago. What I can't is by new APIs.
> I just get started to mapreduce programming and get some problems with my
> code.
> When you get time we can talk online.
> Thanks!
>
> --
>
> Regards!
>
> Jun Tan
>
> At 2011-09-24 01:37:54,"Swathi V" <sw...@zinniasystems.com> wrote:
>
> Hi JunTun,
>
> 1. Distributed Cache in new API usage:
>
>  // Setting up the cache for the application
>
>      1. Copy the requisite files to the FileSystem:
>
>      $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
>      $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
>      $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
>      $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
>      $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
>      $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
>
>      2. Setup the application's JobConf:
>
>      JobConf job = new JobConf();
>      DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
>                                    job);
>      DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
>      DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
>      DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
>      DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
>      DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
>
>      3. Use the cached files in the Mapper <http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Mapper.html>
>      or Reducer <http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Reducer.html>:
>
>      public static class MapClass extends MapReduceBase
>      implements Mapper<K, V, K, V> {
>
>        private Path[] localArchives;
>        private Path[] localFiles;
>
>        public void configure(JobConf job) {
>          // Get the cached archives/files
>          localArchives = DistributedCache.getLocalCacheArchives(job);
>          localFiles = DistributedCache.getLocalCacheFiles(job);
>        }
>
>        public void map(K key, V value,
>                        OutputCollector<K, V> output, Reporter reporter)
>        throws IOException {
>          // Use data from the cached archives/files here
>          // ...
>          // ...
>          output.collect(k, v);
>        }
>      }
>
>
> 2. without distributed cache in simple terms if you are interested i can
> help you with the code.
>
>
>
> 2011/9/23 谭军 <ta...@163.com>
>
>> Hi Swathi.V.,
>> I think my code below would work:
>>
>>         Configuration conf1 = new Configuration();
>>         Job job1 = new Job(conf1, "Retrieval1");
>>         job1.setJarByClass(Retrieval.class);
>>         job1.addCacheFile(new URI(args[0]));   // problem here
>>         conf1.set("keyNodeFile", args[0]);         //try to set key node
>> file path and get file path in mapper1
>>         job1.setOutputKeyClass(Text.class);
>>         job1.setOutputValueClass(Text.class);
>>         job1.setMapperClass(RetrievalMapper.class);
>>         job1.setReducerClass(RetrievalReducer.class);
>>         FileInputFormat.addInputPath(job1, new Path(args[1]));
>>         String out = args[2] + System.nanoTime();
>>
>>         FileOutputFormat.setOutputPath(job1, new Path(out));
>>         job1.waitForCompletion(true);
>>
>>         Configuration conf2 = new Configuration();
>>         Job job2 = new Job(conf2, "Retrieval2");
>>         job2.setJarByClass(Retrieval.class);
>>         conf2.set("newKeyNodeFile", out);   // try to set new key node
>> file path and get it in mapper2
>>         DistributedCache.addCacheFile(new URI(out));  // problem here
>>         job2.setOutputKeyClass(Text.class);
>>         job2.setOutputValueClass(Text.class);
>>         job2.setMapperClass(RetrievalMapper2.class);
>>         job2.setReducerClass(RetrievalReducer2.class);
>>         FileInputFormat.addInputPath(job2, new Path(args[1]));
>>         FileOutputFormat.setOutputPath(job2, new Path(args[2]));
>>         System.exit(job2.waitForCompletion(true) ? 0 : 1);
>>
>> But nullpointer exception was reported when I tried to get file by using
>> distributed cache file.
>> How to use distributed cache file in new APIs ?
>> I also tried to deliver file path by setting global parameters, however,
>> failed either.
>> How can I read "args[0]" file in mapper1 and intermediate file in mapper2
>> use new APIs?
>> Thanks!
>>
>>
>> --
>>
>> Regards!
>>
>> Jun Tan
>>
>> At 2011-09-23 19:06:50,"Swathi V" <sw...@zinniasystems.com> wrote:
>>
>> Hi Jun Tan,
>>
>> Yes i use 0.21.0 version. So i have used those. Well the Hadoop Definitive
>> Guide has job dependency examples for 0.20.x.
>>
>> Thank You,
>>
>> 2011/9/23 谭军 <ta...@163.com>
>>
>>>  Swathi.V.,
>>> ControlledJob cannot be resolved in my eclipse.
>>> My hadoop version is 0.20.2
>>> ControlledJob can only be resolved in hadoop 0.21.0 (+)?
>>> Or I need some certain plugins?
>>> Thanks
>>>
>>> --
>>>
>>> Regards!
>>>
>>> Jun Tan
>>>
>>> At 2011-09-22 00:56:54,"Swathi V" <sw...@zinniasystems.com> wrote:
>>>
>>>
>>> Hi,
>>>
>>> This code might help you
>>> //JobDependancies.java snippet
>>>
>>> Configuration conf = new Configuration();
>>>     Job job1 = new Job(conf, "job1");
>>>     job1.setJarByClass(JobDependancies.class);
>>>     job1.setMapperClass(WordMapper.class);
>>>     job1.setReducerClass(WordReducer.class);
>>>     job1.setOutputKeyClass(Text.class);
>>>     job1.setOutputValueClass(IntWritable.class);
>>>     FileInputFormat.addInputPath(job1, new Path(args[0]));
>>>     String out=args[1]+System.nanoTime();
>>>     FileOutputFormat.setOutputPath(job1, new Path(out));
>>>
>>>
>>>
>>>     Configuration conf2 = new Configuration();
>>>     Job job2  = new Job(conf2, "job2");
>>>     job2.setJarByClass(JobDependancies.class);
>>>     job2.setOutputKeyClass(IntWritable.class);
>>>     job2.setOutputValueClass(Text.class);
>>>     job2.setMapperClass(SortWordMapper.class);
>>>     job2.setReducerClass(Reducer.class);
>>>     FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000"));
>>>     FileOutputFormat.setOutputPath(job2, new Path(args[1]));
>>>
>>>     ControlledJob controlledJob1 = new
>>> ControlledJob(job1.getConfiguration());
>>>     ControlledJob controlledJob2 = new
>>> ControlledJob(job2.getConfiguration());
>>>     controlledJob2.addDependingJob(controlledJob1);
>>>     JobControl jobControl= new JobControl("control");
>>>
>>>     jobControl.addJob(controlledJob1);
>>>     jobControl.addJob(controlledJob2);
>>>
>>>     Thread thread = new Thread(jobControl);
>>>     thread.start();
>>>     while(!jobControl.allFinished())
>>>     {
>>>      try {
>>>      Thread.sleep(10000);
>>>      } catch (InterruptedException e) {
>>>      // TODO Auto-generated catch block
>>>      e.printStackTrace();
>>>      }
>>>     }
>>>     jobControl.stop();
>>>     }
>>> }
>>>
>>>
>>> wordcount output => job1 is given to sort=> job2
>>> Irrespective of mappers and reducers, above mentioned is the way to
>>> handle many jobs.
>>>
>>> 2011/9/21 谭军 <ta...@163.com>
>>>
>>>> Hi,
>>>> I want to use 2 MR jobs sequentially.
>>>> And the first job produces intermediate result to a temp file.
>>>> The second job reads the result in temp file but not the FileInputPath.
>>>> I tried, but FileNotFoundException reported.
>>>> Then I checked the datanodes, temp file was created.
>>>> The first job was executed correctly.
>>>> Why the second job cannot find the file? The file was created before the
>>>> second job was executed.
>>>> Thanks!
>>>>
>>>> --
>>>>
>>>> Regards!
>>>>
>>>> Jun Tan
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Swathi.V.
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Swathi.V.
>>
>>
>>
>>
>
>
> --
> Regards,
> Swathi.V.
>
>
>
>


-- 
Regards,
Swathi.V.

Re:Re: Re: Re: How do I set the intermediate output path when I use 2 mapreduce jobs?

Posted by 谭军 <ta...@163.com>.
Hi Swathi.V.,
Thank you very much.
It's very kind of you to do that.
I think the code you gave is implemented in old APIs.
I made it several days ago. What I can't is by new APIs.
I just get started to mapreduce programming and get some problems with my code.
When you get time we can talk online.
Thanks!


--


Regards!

Jun Tan


At 2011-09-24 01:37:54,"Swathi V" <sw...@zinniasystems.com> wrote:
Hi JunTun,

1. Distributed Cache in new API usage:

 // Setting up the cache for the application
     
     1. Copy the requisite files to the FileSystem:
     
     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat  
     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip  
     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
     
     2. Setup the application's JobConf:
     
     JobConf job = new JobConf();
     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
                                   job);
     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
     
     3. Use the cached files in the Mapper
     or Reducer:
     
     public static class MapClass extends MapReduceBase  
     implements Mapper<K, V, K, V> {
     
       private Path[] localArchives;
       private Path[] localFiles;
       
       public void configure(JobConf job) {
         // Get the cached archives/files
         localArchives = DistributedCache.getLocalCacheArchives(job);
         localFiles = DistributedCache.getLocalCacheFiles(job);
       }
       
       public void map(K key, V value, 
                       OutputCollector<K, V> output, Reporter reporter) 
       throws IOException {
         // Use data from the cached archives/files here
         // ...
         // ...
         output.collect(k, v);
       }
     }
     

2. without distributed cache in simple terms if you are interested i can help you with the code.




2011/9/23 谭军<ta...@163.com>

Hi Swathi.V.,
I think my code below would work:

        Configuration conf1 = new Configuration();
        Job job1 = new Job(conf1, "Retrieval1");
        job1.setJarByClass(Retrieval.class);       
        job1.addCacheFile(new URI(args[0]));   // problem here
        conf1.set("keyNodeFile", args[0]);         //try to set key node file path and get file path in mapper1
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(Text.class);
        job1.setMapperClass(RetrievalMapper.class);
        job1.setReducerClass(RetrievalReducer.class);
        FileInputFormat.addInputPath(job1, new Path(args[1]));
        String out = args[2] + System.nanoTime();

        FileOutputFormat.setOutputPath(job1, new Path(out));

        job1.waitForCompletion(true);
       
        Configuration conf2 = new Configuration();
        Job job2 = new Job(conf2, "Retrieval2");
        job2.setJarByClass(Retrieval.class);
        conf2.set("newKeyNodeFile", out);   // try to set new key node file path and get it in mapper2
        DistributedCache.addCacheFile(new URI(out));  // problem here
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        job2.setMapperClass(RetrievalMapper2.class);
        job2.setReducerClass(RetrievalReducer2.class);
        FileInputFormat.addInputPath(job2, new Path(args[1]));
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));
        System.exit(job2.waitForCompletion(true) ? 0 : 1);

But nullpointer exception was reported when I tried to get file by using distributed cache file.
How to use distributed cache file in new APIs ?
I also tried to deliver file path by setting global parameters, however, failed either.
How can I read "args[0]" file in mapper1 and intermediate file in mapper2 use new APIs?
Thanks!



--


Regards!

Jun Tan


At 2011-09-23 19:06:50,"Swathi V" <sw...@zinniasystems.com> wrote:
HiJun Tan,

Yes i use 0.21.0 version. So i have used those. Well the Hadoop Definitive Guide has job dependency examples for 0.20.x.

Thank You,


2011/9/23 谭军<ta...@163.com>

Swathi.V.,
ControlledJob cannot be resolved in my eclipse.
My hadoop version is 0.20.2
ControlledJob can only be resolved in hadoop 0.21.0 (+)?
Or I need some certain plugins?
Thanks


--


Regards!

Jun Tan


At 2011-09-22 00:56:54,"Swathi V" <sw...@zinniasystems.com> wrote:

Hi,

This code might help you
//JobDependancies.java snippet


Configuration conf = new Configuration();
   Job job1 = new Job(conf, "job1");
   job1.setJarByClass(JobDependancies.class);
   job1.setMapperClass(WordMapper.class);
   job1.setReducerClass(WordReducer.class);
   job1.setOutputKeyClass(Text.class);
   job1.setOutputValueClass(IntWritable.class);
   FileInputFormat.addInputPath(job1, new Path(args[0]));
   String out=args[1]+System.nanoTime();
   FileOutputFormat.setOutputPath(job1, new Path(out));
  
  
  
   Configuration conf2 = new Configuration();
   Job job2  = new Job(conf2, "job2");
   job2.setJarByClass(JobDependancies.class);
   job2.setOutputKeyClass(IntWritable.class);
   job2.setOutputValueClass(Text.class);
   job2.setMapperClass(SortWordMapper.class);
   job2.setReducerClass(Reducer.class);
   FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000"));
   FileOutputFormat.setOutputPath(job2, new Path(args[1]));
  
   ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
   ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
   controlledJob2.addDependingJob(controlledJob1);
   JobControl jobControl= new JobControl("control");
  
   jobControl.addJob(controlledJob1);
   jobControl.addJob(controlledJob2);
  
   Thread thread = new Thread(jobControl);
   thread.start();
   while(!jobControl.allFinished())
   {
   try {
   Thread.sleep(10000);
   } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   }
   }
   jobControl.stop();
   }
}


wordcount output => job1 is given to sort=> job2
Irrespective of mappers and reducers, above mentioned is the way to handle many jobs.


2011/9/21 谭军<ta...@163.com>

Hi,
I want to use 2 MR jobs sequentially.
And the first job produces intermediate result to a temp file.
The second job reads the result in temp file but not the FileInputPath.
I tried, but FileNotFoundException reported.
Then I checked the datanodes, temp file was created.
The first job was executed correctly.
Why the second job cannot find the file? The file was created before the second job was executed.
Thanks!


--


Regards!

Jun Tan







--
Regards,
Swathi.V.








--
Regards,
Swathi.V.








--
Regards,
Swathi.V.


Re: Re: Re: How do I set the intermediate output path when I use 2 mapreduce jobs?

Posted by Swathi V <sw...@zinniasystems.com>.
Hi JunTun,

1. Distributed Cache in new API usage:

 // Setting up the cache for the application

     1. Copy the requisite files to the FileSystem:

     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz

     2. Setup the application's JobConf:

     JobConf job = new JobConf();
     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
                                   job);
     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);

     3. Use the cached files in the Mapper
<http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Mapper.html>
     or Reducer
<http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Reducer.html>:

     public static class MapClass extends MapReduceBase
     implements Mapper<K, V, K, V> {

       private Path[] localArchives;
       private Path[] localFiles;

       public void configure(JobConf job) {
         // Get the cached archives/files
         localArchives = DistributedCache.getLocalCacheArchives(job);
         localFiles = DistributedCache.getLocalCacheFiles(job);
       }

       public void map(K key, V value,
                       OutputCollector<K, V> output, Reporter reporter)
       throws IOException {
         // Use data from the cached archives/files here
         // ...
         // ...
         output.collect(k, v);
       }
     }


2. without distributed cache in simple terms if you are interested i can
help you with the code.



2011/9/23 谭军 <ta...@163.com>

> Hi Swathi.V.,
> I think my code below would work:
>
>         Configuration conf1 = new Configuration();
>         Job job1 = new Job(conf1, "Retrieval1");
>         job1.setJarByClass(Retrieval.class);
>         job1.addCacheFile(new URI(args[0]));   // problem here
>         conf1.set("keyNodeFile", args[0]);         //try to set key node
> file path and get file path in mapper1
>         job1.setOutputKeyClass(Text.class);
>         job1.setOutputValueClass(Text.class);
>         job1.setMapperClass(RetrievalMapper.class);
>         job1.setReducerClass(RetrievalReducer.class);
>         FileInputFormat.addInputPath(job1, new Path(args[1]));
>         String out = args[2] + System.nanoTime();
>
>         FileOutputFormat.setOutputPath(job1, new Path(out));
>         job1.waitForCompletion(true);
>
>         Configuration conf2 = new Configuration();
>         Job job2 = new Job(conf2, "Retrieval2");
>         job2.setJarByClass(Retrieval.class);
>         conf2.set("newKeyNodeFile", out);   // try to set new key node file
> path and get it in mapper2
>         DistributedCache.addCacheFile(new URI(out));  // problem here
>         job2.setOutputKeyClass(Text.class);
>         job2.setOutputValueClass(Text.class);
>         job2.setMapperClass(RetrievalMapper2.class);
>         job2.setReducerClass(RetrievalReducer2.class);
>         FileInputFormat.addInputPath(job2, new Path(args[1]));
>         FileOutputFormat.setOutputPath(job2, new Path(args[2]));
>         System.exit(job2.waitForCompletion(true) ? 0 : 1);
>
> But nullpointer exception was reported when I tried to get file by using
> distributed cache file.
> How to use distributed cache file in new APIs ?
> I also tried to deliver file path by setting global parameters, however,
> failed either.
> How can I read "args[0]" file in mapper1 and intermediate file in mapper2
> use new APIs?
> Thanks!
>
>
> --
>
> Regards!
>
> Jun Tan
>
> At 2011-09-23 19:06:50,"Swathi V" <sw...@zinniasystems.com> wrote:
>
> Hi Jun Tan,
>
> Yes i use 0.21.0 version. So i have used those. Well the Hadoop Definitive
> Guide has job dependency examples for 0.20.x.
>
> Thank You,
>
> 2011/9/23 谭军 <ta...@163.com>
>
>>  Swathi.V.,
>> ControlledJob cannot be resolved in my eclipse.
>> My hadoop version is 0.20.2
>> ControlledJob can only be resolved in hadoop 0.21.0 (+)?
>> Or I need some certain plugins?
>> Thanks
>>
>> --
>>
>> Regards!
>>
>> Jun Tan
>>
>> At 2011-09-22 00:56:54,"Swathi V" <sw...@zinniasystems.com> wrote:
>>
>>
>> Hi,
>>
>> This code might help you
>> //JobDependancies.java snippet
>>
>> Configuration conf = new Configuration();
>>     Job job1 = new Job(conf, "job1");
>>     job1.setJarByClass(JobDependancies.class);
>>     job1.setMapperClass(WordMapper.class);
>>     job1.setReducerClass(WordReducer.class);
>>     job1.setOutputKeyClass(Text.class);
>>     job1.setOutputValueClass(IntWritable.class);
>>     FileInputFormat.addInputPath(job1, new Path(args[0]));
>>     String out=args[1]+System.nanoTime();
>>     FileOutputFormat.setOutputPath(job1, new Path(out));
>>
>>
>>
>>     Configuration conf2 = new Configuration();
>>     Job job2  = new Job(conf2, "job2");
>>     job2.setJarByClass(JobDependancies.class);
>>     job2.setOutputKeyClass(IntWritable.class);
>>     job2.setOutputValueClass(Text.class);
>>     job2.setMapperClass(SortWordMapper.class);
>>     job2.setReducerClass(Reducer.class);
>>     FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000"));
>>     FileOutputFormat.setOutputPath(job2, new Path(args[1]));
>>
>>     ControlledJob controlledJob1 = new
>> ControlledJob(job1.getConfiguration());
>>     ControlledJob controlledJob2 = new
>> ControlledJob(job2.getConfiguration());
>>     controlledJob2.addDependingJob(controlledJob1);
>>     JobControl jobControl= new JobControl("control");
>>
>>     jobControl.addJob(controlledJob1);
>>     jobControl.addJob(controlledJob2);
>>
>>     Thread thread = new Thread(jobControl);
>>     thread.start();
>>     while(!jobControl.allFinished())
>>     {
>>      try {
>>      Thread.sleep(10000);
>>      } catch (InterruptedException e) {
>>      // TODO Auto-generated catch block
>>      e.printStackTrace();
>>      }
>>     }
>>     jobControl.stop();
>>     }
>> }
>>
>>
>> wordcount output => job1 is given to sort=> job2
>> Irrespective of mappers and reducers, above mentioned is the way to handle
>> many jobs.
>>
>> 2011/9/21 谭军 <ta...@163.com>
>>
>>> Hi,
>>> I want to use 2 MR jobs sequentially.
>>> And the first job produces intermediate result to a temp file.
>>> The second job reads the result in temp file but not the FileInputPath.
>>> I tried, but FileNotFoundException reported.
>>> Then I checked the datanodes, temp file was created.
>>> The first job was executed correctly.
>>> Why the second job cannot find the file? The file was created before the
>>> second job was executed.
>>> Thanks!
>>>
>>> --
>>>
>>> Regards!
>>>
>>> Jun Tan
>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Swathi.V.
>>
>>
>>
>>
>
>
> --
> Regards,
> Swathi.V.
>
>
>
>


-- 
Regards,
Swathi.V.

Re:Re: Re: How do I set the intermediate output path when I use 2 mapreduce jobs?

Posted by 谭军 <ta...@163.com>.
Hi Swathi.V.,
I think my code below would work:

        Configuration conf1 = new Configuration();
        Job job1 = new Job(conf1, "Retrieval1");
        job1.setJarByClass(Retrieval.class);       
        job1.addCacheFile(new URI(args[0]));   // problem here
        conf1.set("keyNodeFile", args[0]);         //try to set key node file path and get file path in mapper1
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(Text.class);
        job1.setMapperClass(RetrievalMapper.class);
        job1.setReducerClass(RetrievalReducer.class);
        FileInputFormat.addInputPath(job1, new Path(args[1]));
        String out = args[2] + System.nanoTime();
        FileOutputFormat.setOutputPath(job1, new Path(out));
        job1.waitForCompletion(true);
       
        Configuration conf2 = new Configuration();
        Job job2 = new Job(conf2, "Retrieval2");
        job2.setJarByClass(Retrieval.class);
        conf2.set("newKeyNodeFile", out);   // try to set new key node file path and get it in mapper2
        DistributedCache.addCacheFile(new URI(out));  // problem here
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        job2.setMapperClass(RetrievalMapper2.class);
        job2.setReducerClass(RetrievalReducer2.class);
        FileInputFormat.addInputPath(job2, new Path(args[1]));
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));
        System.exit(job2.waitForCompletion(true) ? 0 : 1);

But nullpointer exception was reported when I tried to get file by using distributed cache file.
How to use distributed cache file in new APIs ?
I also tried to deliver file path by setting global parameters, however, failed either.
How can I read "args[0]" file in mapper1 and intermediate file in mapper2 use new APIs?
Thanks!



--


Regards!

Jun Tan


At 2011-09-23 19:06:50,"Swathi V" <sw...@zinniasystems.com> wrote:
HiJun Tan,

Yes i use 0.21.0 version. So i have used those. Well the Hadoop Definitive Guide has job dependency examples for 0.20.x.

Thank You,


2011/9/23 谭军<ta...@163.com>

Swathi.V.,
ControlledJob cannot be resolved in my eclipse.
My hadoop version is 0.20.2
ControlledJob can only be resolved in hadoop 0.21.0 (+)?
Or I need some certain plugins?
Thanks


--


Regards!

Jun Tan


At 2011-09-22 00:56:54,"Swathi V" <sw...@zinniasystems.com> wrote:

Hi,

This code might help you
//JobDependancies.java snippet


Configuration conf = new Configuration();
   Job job1 = new Job(conf, "job1");
   job1.setJarByClass(JobDependancies.class);
   job1.setMapperClass(WordMapper.class);
   job1.setReducerClass(WordReducer.class);
   job1.setOutputKeyClass(Text.class);
   job1.setOutputValueClass(IntWritable.class);
   FileInputFormat.addInputPath(job1, new Path(args[0]));
   String out=args[1]+System.nanoTime();
   FileOutputFormat.setOutputPath(job1, new Path(out));
  
  
  
   Configuration conf2 = new Configuration();
   Job job2  = new Job(conf2, "job2");
   job2.setJarByClass(JobDependancies.class);
   job2.setOutputKeyClass(IntWritable.class);
   job2.setOutputValueClass(Text.class);
   job2.setMapperClass(SortWordMapper.class);
   job2.setReducerClass(Reducer.class);
   FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000"));
   FileOutputFormat.setOutputPath(job2, new Path(args[1]));
  
   ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
   ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
   controlledJob2.addDependingJob(controlledJob1);
   JobControl jobControl= new JobControl("control");
  
   jobControl.addJob(controlledJob1);
   jobControl.addJob(controlledJob2);
  
   Thread thread = new Thread(jobControl);
   thread.start();
   while(!jobControl.allFinished())
   {
   try {
   Thread.sleep(10000);
   } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   }
   }
   jobControl.stop();
   }
}


wordcount output => job1 is given to sort=> job2
Irrespective of mappers and reducers, above mentioned is the way to handle many jobs.


2011/9/21 谭军<ta...@163.com>

Hi,
I want to use 2 MR jobs sequentially.
And the first job produces intermediate result to a temp file.
The second job reads the result in temp file but not the FileInputPath.
I tried, but FileNotFoundException reported.
Then I checked the datanodes, temp file was created.
The first job was executed correctly.
Why the second job cannot find the file? The file was created before the second job was executed.
Thanks!


--


Regards!

Jun Tan







--
Regards,
Swathi.V.








--
Regards,
Swathi.V.


Re: Re: How do I set the intermediate output path when I use 2 mapreduce jobs?

Posted by Swathi V <sw...@zinniasystems.com>.
Hi Jun Tan,

Yes i use 0.21.0 version. So i have used those. Well the Hadoop Definitive
Guide has job dependency examples for 0.20.x.

Thank You,

2011/9/23 谭军 <ta...@163.com>

> Swathi.V.,
> ControlledJob cannot be resolved in my eclipse.
> My hadoop version is 0.20.2
> ControlledJob can only be resolved in hadoop 0.21.0 (+)?
> Or I need some certain plugins?
> Thanks
>
> --
>
> Regards!
>
> Jun Tan
>
> At 2011-09-22 00:56:54,"Swathi V" <sw...@zinniasystems.com> wrote:
>
>
> Hi,
>
> This code might help you
> //JobDependancies.java snippet
>
> Configuration conf = new Configuration();
>     Job job1 = new Job(conf, "job1");
>     job1.setJarByClass(JobDependancies.class);
>     job1.setMapperClass(WordMapper.class);
>     job1.setReducerClass(WordReducer.class);
>     job1.setOutputKeyClass(Text.class);
>     job1.setOutputValueClass(IntWritable.class);
>     FileInputFormat.addInputPath(job1, new Path(args[0]));
>     String out=args[1]+System.nanoTime();
>     FileOutputFormat.setOutputPath(job1, new Path(out));
>
>
>
>     Configuration conf2 = new Configuration();
>     Job job2  = new Job(conf2, "job2");
>     job2.setJarByClass(JobDependancies.class);
>     job2.setOutputKeyClass(IntWritable.class);
>     job2.setOutputValueClass(Text.class);
>     job2.setMapperClass(SortWordMapper.class);
>     job2.setReducerClass(Reducer.class);
>     FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000"));
>     FileOutputFormat.setOutputPath(job2, new Path(args[1]));
>
>     ControlledJob controlledJob1 = new
> ControlledJob(job1.getConfiguration());
>     ControlledJob controlledJob2 = new
> ControlledJob(job2.getConfiguration());
>     controlledJob2.addDependingJob(controlledJob1);
>     JobControl jobControl= new JobControl("control");
>
>     jobControl.addJob(controlledJob1);
>     jobControl.addJob(controlledJob2);
>
>     Thread thread = new Thread(jobControl);
>     thread.start();
>     while(!jobControl.allFinished())
>     {
>      try {
>      Thread.sleep(10000);
>      } catch (InterruptedException e) {
>      // TODO Auto-generated catch block
>      e.printStackTrace();
>      }
>     }
>     jobControl.stop();
>     }
> }
>
>
> wordcount output => job1 is given to sort=> job2
> Irrespective of mappers and reducers, above mentioned is the way to handle
> many jobs.
>
> 2011/9/21 谭军 <ta...@163.com>
>
>> Hi,
>> I want to use 2 MR jobs sequentially.
>> And the first job produces intermediate result to a temp file.
>> The second job reads the result in temp file but not the FileInputPath.
>> I tried, but FileNotFoundException reported.
>> Then I checked the datanodes, temp file was created.
>> The first job was executed correctly.
>> Why the second job cannot find the file? The file was created before the
>> second job was executed.
>> Thanks!
>>
>> --
>>
>> Regards!
>>
>> Jun Tan
>>
>>
>>
>
>
> --
> Regards,
> Swathi.V.
>
>
>
>


-- 
Regards,
Swathi.V.

Re:Re: How do I set the intermediate output path when I use 2 mapreduce jobs?

Posted by 谭军 <ta...@163.com>.
Swathi.V.,
ControlledJob cannot be resolved in my eclipse.
My hadoop version is 0.20.2
ControlledJob can only be resolved in hadoop 0.21.0 (+)?
Or I need some certain plugins?
Thanks


--


Regards!

Jun Tan


At 2011-09-22 00:56:54,"Swathi V" <sw...@zinniasystems.com> wrote:

Hi,

This code might help you
//JobDependancies.java snippet


Configuration conf = new Configuration();
   Job job1 = new Job(conf, "job1");
   job1.setJarByClass(JobDependancies.class);
   job1.setMapperClass(WordMapper.class);
   job1.setReducerClass(WordReducer.class);
   job1.setOutputKeyClass(Text.class);
   job1.setOutputValueClass(IntWritable.class);
   FileInputFormat.addInputPath(job1, new Path(args[0]));
   String out=args[1]+System.nanoTime();
   FileOutputFormat.setOutputPath(job1, new Path(out));
  
  
  
   Configuration conf2 = new Configuration();
   Job job2  = new Job(conf2, "job2");
   job2.setJarByClass(JobDependancies.class);
   job2.setOutputKeyClass(IntWritable.class);
   job2.setOutputValueClass(Text.class);
   job2.setMapperClass(SortWordMapper.class);
   job2.setReducerClass(Reducer.class);
   FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000"));
   FileOutputFormat.setOutputPath(job2, new Path(args[1]));
  
   ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
   ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
   controlledJob2.addDependingJob(controlledJob1);
   JobControl jobControl= new JobControl("control");
  
   jobControl.addJob(controlledJob1);
   jobControl.addJob(controlledJob2);
  
   Thread thread = new Thread(jobControl);
   thread.start();
   while(!jobControl.allFinished())
   {
   try {
   Thread.sleep(10000);
   } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   }
   }
   jobControl.stop();
   }
}


wordcount output => job1 is given to sort=> job2
Irrespective of mappers and reducers, above mentioned is the way to handle many jobs.


2011/9/21 谭军<ta...@163.com>

Hi,
I want to use 2 MR jobs sequentially.
And the first job produces intermediate result to a temp file.
The second job reads the result in temp file but not the FileInputPath.
I tried, but FileNotFoundException reported.
Then I checked the datanodes, temp file was created.
The first job was executed correctly.
Why the second job cannot find the file? The file was created before the second job was executed.
Thanks!


--


Regards!

Jun Tan







--
Regards,
Swathi.V.


Re: How do I set the intermediate output path when I use 2 mapreduce jobs?

Posted by Swathi V <sw...@zinniasystems.com>.
Hi,

This code might help you
//JobDependancies.java snippet

Configuration conf = new Configuration();
   Job job1 = new Job(conf, "job1");
   job1.setJarByClass(JobDependancies.class);
   job1.setMapperClass(WordMapper.class);
   job1.setReducerClass(WordReducer.class);
   job1.setOutputKeyClass(Text.class);
   job1.setOutputValueClass(IntWritable.class);
   FileInputFormat.addInputPath(job1, new Path(args[0]));
   String out=args[1]+System.nanoTime();
   FileOutputFormat.setOutputPath(job1, new Path(out));



   Configuration conf2 = new Configuration();
   Job job2  = new Job(conf2, "job2");
   job2.setJarByClass(JobDependancies.class);
   job2.setOutputKeyClass(IntWritable.class);
   job2.setOutputValueClass(Text.class);
   job2.setMapperClass(SortWordMapper.class);
   job2.setReducerClass(Reducer.class);
   FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000"));
   FileOutputFormat.setOutputPath(job2, new Path(args[1]));

   ControlledJob controlledJob1 = new
ControlledJob(job1.getConfiguration());
   ControlledJob controlledJob2 = new
ControlledJob(job2.getConfiguration());
   controlledJob2.addDependingJob(controlledJob1);
   JobControl jobControl= new JobControl("control");

   jobControl.addJob(controlledJob1);
   jobControl.addJob(controlledJob2);

   Thread thread = new Thread(jobControl);
   thread.start();
   while(!jobControl.allFinished())
   {
    try {
    Thread.sleep(10000);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
   }
   jobControl.stop();
   }
}


wordcount output => job1 is given to sort=> job2
Irrespective of mappers and reducers, above mentioned is the way to handle
many jobs.

2011/9/21 谭军 <ta...@163.com>

> Hi,
> I want to use 2 MR jobs sequentially.
> And the first job produces intermediate result to a temp file.
> The second job reads the result in temp file but not the FileInputPath.
> I tried, but FileNotFoundException reported.
> Then I checked the datanodes, temp file was created.
> The first job was executed correctly.
> Why the second job cannot find the file? The file was created before the
> second job was executed.
> Thanks!
>
> --
>
> Regards!
>
> Jun Tan
>
>
>


-- 
Regards,
Swathi.V.

Re: How do I set the intermediate output path when I use 2 mapreduce jobs?

Posted by Robert Evans <ev...@yahoo-inc.com>.
Sorry about the confusion then.  Look at the code that Swathi sent.  Your problem is probably with timing some where.  You may be launching the second job before the first one completely finished.

--Bobby Evans

On 9/21/11 7:27 PM, "谭军" <ta...@163.com> wrote:

Bobby Evans
Temp files are on HDFS not on local file system.

--
Regards!

Jun Tan


At 2011-09-21 22:57:44,"Robert Evans" <ev...@yahoo-inc.com> wrote:
Jun Tan,

So you want to have the temp file on the local file system, not on HDFS?  That is not going to work, because there are other parts of the code that assume that they can see the file (i.e. The splitter) which it cannot if it is only on the local file system of a remote host.  It has to be stored in HDFS, or some other globally viewable file system.

--Bobby Evans


On 9/21/11 9:54 AM, "谭军" <tanjun_2525@163.com <ht...@163.com> > wrote:

Hi,
I want to use 2 MR jobs sequentially.
And the first job produces intermediate result to a temp file.
The second job reads the result in temp file but not the FileInputPath.
I tried, but FileNotFoundException reported.
Then I checked the datanodes, temp file was created.
The first job was executed correctly.
Why the second job cannot find the file? The file was created before the second job was executed.
Thanks!

--
Regards!

Jun Tan







Re:Re: How do I set the intermediate output path when I use 2 mapreduce jobs?

Posted by 谭军 <ta...@163.com>.
Bobby Evans
Temp files are on HDFS not on local file system.


--


Regards!

Jun Tan



At 2011-09-21 22:57:44,"Robert Evans" <ev...@yahoo-inc.com> wrote:
Jun Tan,

So you want to have the temp file on the local file system, not on HDFS?  That is not going to work, because there are other parts of the code that assume that they can see the file (i.e. The splitter) which it cannot if it is only on the local file system of a remote host.  It has to be stored in HDFS, or some other globally viewable file system.

--Bobby Evans


On 9/21/11 9:54 AM, "谭军" <ta...@163.com> wrote:

Hi,
I want to use 2 MR jobs sequentially.
And the first job produces intermediate result to a temp file.
The second job reads the result in temp file but not the FileInputPath.
I tried, but FileNotFoundException reported.
Then I checked the datanodes, temp file was created.
The first job was executed correctly.
Why the second job cannot find the file? The file was created before the second job was executed.
Thanks!

--
Regards!

Jun Tan




Re: How do I set the intermediate output path when I use 2 mapreduce jobs?

Posted by Robert Evans <ev...@yahoo-inc.com>.
Jun Tan,

So you want to have the temp file on the local file system, not on HDFS?  That is not going to work, because there are other parts of the code that assume that they can see the file (i.e. The splitter) which it cannot if it is only on the local file system of a remote host.  It has to be stored in HDFS, or some other globally viewable file system.

--Bobby Evans


On 9/21/11 9:54 AM, "谭军" <ta...@163.com> wrote:

Hi,
I want to use 2 MR jobs sequentially.
And the first job produces intermediate result to a temp file.
The second job reads the result in temp file but not the FileInputPath.
I tried, but FileNotFoundException reported.
Then I checked the datanodes, temp file was created.
The first job was executed correctly.
Why the second job cannot find the file? The file was created before the second job was executed.
Thanks!

--
Regards!

Jun Tan