You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Maxim Veksler <ma...@vekslers.org> on 2010/08/25 11:12:34 UTC

Directing output from Hive MR second custom MR job

Hello,

Any pointers about directing output from Hive MR as input for another (not
hive based) MR job?

I would like to use hive to select a subset of my data and then process that
data using my custom code.

I think the correct them is "map reduce job chaining" ?

Thank you,
Maxim.

Re: Directing output from Hive MR second custom MR job

Posted by Edward Capriolo <ed...@gmail.com>.
On Thu, Aug 26, 2010 at 1:33 AM, Neil Xu <ne...@gmail.com> wrote:
> Hi, Maxim,
>
>     I misunderstand what you want, you need a job chain that a MR job(not
> hive) can be automatically run after a Hive job is done, and temp files can
> also be cleaned automatically?
>     I have no idea also, but in our company, a scheduling system is
> implemented to manage different kinds of jobs, for example, you can set the
> job chain in advance, and the system will call a hive job first, then a
> shell job, or another MR job,etc.
>
>     is there anyone have some ideas?
>
> -Chocobo
>
> 2010/8/25 Maxim Veksler <ma...@vekslers.org>
>>
>> Hi Neil,
>> On Wed, Aug 25, 2010 at 2:41 PM, Neil Xu <ne...@gmail.com> wrote:
>>>
>>> You can set the input path and output path for each job, and run jobs in
>>> order.
>>>
>>> ex. TwoJobs.java
>>>
>>>
>>> public class TwoJobs extends Configured implements Tool {
>>>
>>>   public static class Job1Mapper extends MapReduceBase implements
>>>             Mapper<LongWritable, Text, Text, Text> {
>>>   }
>>>   public static class Job1Reducer extends MapReduceBase implements
>>>             Reducer<Text, Text, Text, NullWritable> {
>>>   }
>>>   public static class Job2Mapper extends MapReduceBase implements
>>>             Mapper<LongWritable, Text, Text, Text> {
>>>   }
>>>   public static class Job2Reducer extends MapReduceBase implements
>>>             Reducer<Text, Text, Text, NullWritable> {
>>>   }
>>>    ...
>>>
>>>   public int run(String[] args) throws Exception {
>>>
>>>     Path InputPath1 = new Path("/input1");
>>>     Path OutputPath1 = new Path("/output1");
>>>     Path InputPath2 = new Path("/output1");
>>>     Path OutputPath2 = new Path("/output2");
>>>
>>> ////////////////////////////////////////////////////////////////////////////////////////
>>>     JobConf job1 = new JobConf(getConf(), TwoJobs.class);
>>>     job1.setJobName("Job1");
>>>     job1.setOutputKeyClass(Text.class);
>>>     job1.setOutputValueClass(Text.class);
>>>     job1.setMapperClass(Job1Mapper.class);
>>>     job1.setReducerClass(Job1Reducer.class);
>>>
>>>     job1.setInputFormat(TextInputFormat.class);
>>>    SequenceFileInputFormat.setInputPath(job1, InputPath1);
>>>    SequenceFileOutputFormat.setOutputPath(job1, OutputPath1);
>>>
>>>    JobClient.runJob(job1);
>>>
>>> //////////////////////////////////////////////////////////////////////////////////////////
>>>    JobConf job2 = new JobConf(getConf(), TwoJobs.class);
>>>     job2.setJobName("Job2");
>>>     job2.setOutputKeyClass(Text.class);
>>>     job2.setOutputValueClass(Text.class);
>>>     job2.setMapperClass(Job2Mapper.class);
>>>     job2.setReducerClass(Job2Reducer.class);
>>>
>>>     job2.setInputFormat(TextInputFormat.class);
>>>    SequenceFileInputFormat.setInputPath(job2, OutputPath1);
>>>    SequenceFileOutputFormat.setOutputPath(job2, OutputPath2);
>>>
>>>    JobClient.runJob(job2);
>>>   }
>>> }
>>>
>>> Chocobo
>>>
>>> 2010/8/25 Maxim Veksler <ma...@vekslers.org>
>>>>
>>>> Hello,
>>>> Any pointers about directing output from Hive MR as input for another
>>>> (not hive based) MR job?
>>>> I would like to use hive to select a subset of my data and then process
>>>> that data using my custom code.
>>>> I think the correct them is "map reduce job chaining" ?
>>>> Thank you,
>>>> Maxim.
>>
>> Thanks for the reply.
>> If I under stand your suggestion, I should be running a hive job (using
>> shell script for ex.) but writing the output of the job to a predefined hdfs
>> location. Then I should invoke another MR job using the output from the
>> first MR (hive based) job as input for my job.
>> I think this could work, but I see several problem that I would prefer to
>> a void (if possible).
>> - First, if the first job fails It's my own responsibility (vs. the hadoop
>> framework) to be aware of this, and not run the second job.
>> - Second is that I need to manully delete the output of the hive job after
>> the second MR job is finished (succesfuly).
>> - Third, the described batch nature process will require setting up 2
>> hadoop JVM's (which has high setup time over head).
>> Could this problem be address in some fasion?
>> Perhaps I somehow "tell" hadoop/hive to treat the output from the first
>> job (which will be invoked via the hive api. How?) as temporary (the same
>> way it treats temporary map phase data, before being transfered to the
>> reducers).
>> Maxim.
>

I accomplish this using "hive --service jar". described here:
https://issues.apache.org/jira/browse/HIVE-617. This is not as
affective as ChainMapper/ChainReduer but since i know where the final
output is going I can use it in the next stage of processing.

Re: Directing output from Hive MR second custom MR job

Posted by Neil Xu <ne...@gmail.com>.
Hi, Maxim,

    I misunderstand what you want, you need a job chain that a MR job(not
hive) can be automatically run after a Hive job is done, and temp files can
also be cleaned automatically?
    I have no idea also, but in our company, a scheduling system is
implemented to manage different kinds of jobs, for example, you can set the
job chain in advance, and the system will call a hive job first, then a
shell job, or another MR job,etc.

   * is there anyone have some ideas?*

-Chocobo

2010/8/25 Maxim Veksler <ma...@vekslers.org>

> Hi Neil,
>
> On Wed, Aug 25, 2010 at 2:41 PM, Neil Xu <ne...@gmail.com> wrote:
>
>> You can set the input path and output path for each job, and run jobs in
>> order.
>>
>> ex. TwoJobs.java
>>
>>
>> public class TwoJobs extends Configured implements Tool {
>>
>>   public static class Job1Mapper extends MapReduceBase implements
>>             Mapper<LongWritable, Text, Text, Text> {
>>   }
>>   public static class Job1Reducer extends MapReduceBase implements
>>             Reducer<Text, Text, Text, NullWritable> {
>>   }
>>   public static class Job2Mapper extends MapReduceBase implements
>>             Mapper<LongWritable, Text, Text, Text> {
>>   }
>>   public static class Job2Reducer extends MapReduceBase implements
>>             Reducer<Text, Text, Text, NullWritable> {
>>   }
>>    ...
>>
>>   public int run(String[] args) throws Exception {
>>
>>     Path InputPath1 = new Path("/input1");
>>     Path OutputPath1 = new Path("/output1");
>>     Path InputPath2 = new Path("/output1");
>>     Path OutputPath2 = new Path("/output2");
>>
>> ////////////////////////////////////////////////////////////////////////////////////////
>>     JobConf job1 = new JobConf(getConf(), TwoJobs.class);
>>     job1.setJobName("Job1");
>>     job1.setOutputKeyClass(Text.class);
>>     job1.setOutputValueClass(Text.class);
>>     job1.setMapperClass(Job1Mapper.class);
>>     job1.setReducerClass(Job1Reducer.class);
>>
>>     job1.setInputFormat(TextInputFormat.class);
>>    SequenceFileInputFormat.setInputPath(job1, InputPath1);
>>    SequenceFileOutputFormat.setOutputPath(job1, OutputPath1);
>>
>>    JobClient.runJob(job1);
>>
>> //////////////////////////////////////////////////////////////////////////////////////////
>>    JobConf job2 = new JobConf(getConf(), TwoJobs.class);
>>     job2.setJobName("Job2");
>>     job2.setOutputKeyClass(Text.class);
>>     job2.setOutputValueClass(Text.class);
>>     job2.setMapperClass(Job2Mapper.class);
>>     job2.setReducerClass(Job2Reducer.class);
>>
>>     job2.setInputFormat(TextInputFormat.class);
>>    SequenceFileInputFormat.setInputPath(job2, OutputPath1);
>>    SequenceFileOutputFormat.setOutputPath(job2, OutputPath2);
>>
>>    JobClient.runJob(job2);
>>   }
>> }
>>
>> Chocobo
>>
>> 2010/8/25 Maxim Veksler <ma...@vekslers.org>
>>
>> Hello,
>>>
>>> Any pointers about directing output from Hive MR as input for another
>>> (not hive based) MR job?
>>>
>>> I would like to use hive to select a subset of my data and then process
>>> that data using my custom code.
>>>
>>> I think the correct them is "map reduce job chaining" ?
>>>
>>> Thank you,
>>> Maxim.
>>>
>>
>>
> Thanks for the reply.
>
> If I under stand your suggestion, I should be running a hive job (using
> shell script for ex.) but writing the output of the job to a predefined hdfs
> location. Then I should invoke another MR job using the output from the
> first MR (hive based) job as input for my job.
>
> I think this could work, but I see several problem that I would prefer to a
> void (if possible).
> - First, if the first job fails It's my own responsibility (vs. the hadoop
> framework) to be aware of this, and not run the second job.
> - Second is that I need to manully delete the output of the hive job after
> the second MR job is finished (succesfuly).
> - Third, the described batch nature process will require setting up 2
> hadoop JVM's (which has high setup time over head).
>
> Could this problem be address in some fasion?
> Perhaps I somehow "tell" hadoop/hive to treat the output from the first job
> (which will be invoked via the hive api. How?) as temporary (the same way it
> treats temporary map phase data, before being transfered to the reducers).
>
> Maxim.
>

Re: Directing output from Hive MR second custom MR job

Posted by Maxim Veksler <ma...@vekslers.org>.
Hi Neil,

On Wed, Aug 25, 2010 at 2:41 PM, Neil Xu <ne...@gmail.com> wrote:

> You can set the input path and output path for each job, and run jobs in
> order.
>
> ex. TwoJobs.java
>
>
> public class TwoJobs extends Configured implements Tool {
>
>   public static class Job1Mapper extends MapReduceBase implements
>             Mapper<LongWritable, Text, Text, Text> {
>   }
>   public static class Job1Reducer extends MapReduceBase implements
>             Reducer<Text, Text, Text, NullWritable> {
>   }
>   public static class Job2Mapper extends MapReduceBase implements
>             Mapper<LongWritable, Text, Text, Text> {
>   }
>   public static class Job2Reducer extends MapReduceBase implements
>             Reducer<Text, Text, Text, NullWritable> {
>   }
>    ...
>
>   public int run(String[] args) throws Exception {
>
>     Path InputPath1 = new Path("/input1");
>     Path OutputPath1 = new Path("/output1");
>     Path InputPath2 = new Path("/output1");
>     Path OutputPath2 = new Path("/output2");
>
> ////////////////////////////////////////////////////////////////////////////////////////
>     JobConf job1 = new JobConf(getConf(), TwoJobs.class);
>     job1.setJobName("Job1");
>     job1.setOutputKeyClass(Text.class);
>     job1.setOutputValueClass(Text.class);
>     job1.setMapperClass(Job1Mapper.class);
>     job1.setReducerClass(Job1Reducer.class);
>
>     job1.setInputFormat(TextInputFormat.class);
>    SequenceFileInputFormat.setInputPath(job1, InputPath1);
>    SequenceFileOutputFormat.setOutputPath(job1, OutputPath1);
>
>    JobClient.runJob(job1);
>
> //////////////////////////////////////////////////////////////////////////////////////////
>    JobConf job2 = new JobConf(getConf(), TwoJobs.class);
>     job2.setJobName("Job2");
>     job2.setOutputKeyClass(Text.class);
>     job2.setOutputValueClass(Text.class);
>     job2.setMapperClass(Job2Mapper.class);
>     job2.setReducerClass(Job2Reducer.class);
>
>     job2.setInputFormat(TextInputFormat.class);
>    SequenceFileInputFormat.setInputPath(job2, OutputPath1);
>    SequenceFileOutputFormat.setOutputPath(job2, OutputPath2);
>
>    JobClient.runJob(job2);
>   }
> }
>
> Chocobo
>
> 2010/8/25 Maxim Veksler <ma...@vekslers.org>
>
> Hello,
>>
>> Any pointers about directing output from Hive MR as input for another (not
>> hive based) MR job?
>>
>> I would like to use hive to select a subset of my data and then process
>> that data using my custom code.
>>
>> I think the correct them is "map reduce job chaining" ?
>>
>> Thank you,
>> Maxim.
>>
>
>
Thanks for the reply.

If I under stand your suggestion, I should be running a hive job (using
shell script for ex.) but writing the output of the job to a predefined hdfs
location. Then I should invoke another MR job using the output from the
first MR (hive based) job as input for my job.

I think this could work, but I see several problem that I would prefer to a
void (if possible).
- First, if the first job fails It's my own responsibility (vs. the hadoop
framework) to be aware of this, and not run the second job.
- Second is that I need to manully delete the output of the hive job after
the second MR job is finished (succesfuly).
- Third, the described batch nature process will require setting up 2 hadoop
JVM's (which has high setup time over head).

Could this problem be address in some fasion?
Perhaps I somehow "tell" hadoop/hive to treat the output from the first job
(which will be invoked via the hive api. How?) as temporary (the same way it
treats temporary map phase data, before being transfered to the reducers).

Maxim.

Re: Directing output from Hive MR second custom MR job

Posted by Neil Xu <ne...@gmail.com>.
You can set the input path and output path for each job, and run jobs in
order.

ex. TwoJobs.java


public class TwoJobs extends Configured implements Tool {

  public static class Job1Mapper extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {
  }
  public static class Job1Reducer extends MapReduceBase implements
            Reducer<Text, Text, Text, NullWritable> {
  }
  public static class Job2Mapper extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {
  }
  public static class Job2Reducer extends MapReduceBase implements
            Reducer<Text, Text, Text, NullWritable> {
  }
   ...

  public int run(String[] args) throws Exception {

    Path InputPath1 = new Path("/input1");
    Path OutputPath1 = new Path("/output1");
    Path InputPath2 = new Path("/output1");
    Path OutputPath2 = new Path("/output2");
////////////////////////////////////////////////////////////////////////////////////////
    JobConf job1 = new JobConf(getConf(), TwoJobs.class);
    job1.setJobName("Job1");
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(Text.class);
    job1.setMapperClass(Job1Mapper.class);
    job1.setReducerClass(Job1Reducer.class);

    job1.setInputFormat(TextInputFormat.class);
   SequenceFileInputFormat.setInputPath(job1, InputPath1);
   SequenceFileOutputFormat.setOutputPath(job1, OutputPath1);

   JobClient.runJob(job1);
//////////////////////////////////////////////////////////////////////////////////////////
   JobConf job2 = new JobConf(getConf(), TwoJobs.class);
    job2.setJobName("Job2");
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(Text.class);
    job2.setMapperClass(Job2Mapper.class);
    job2.setReducerClass(Job2Reducer.class);

    job2.setInputFormat(TextInputFormat.class);
   SequenceFileInputFormat.setInputPath(job2, OutputPath1);
   SequenceFileOutputFormat.setOutputPath(job2, OutputPath2);

   JobClient.runJob(job2);
  }
}

Chocobo

2010/8/25 Maxim Veksler <ma...@vekslers.org>

> Hello,
>
> Any pointers about directing output from Hive MR as input for another (not
> hive based) MR job?
>
> I would like to use hive to select a subset of my data and then process
> that data using my custom code.
>
> I think the correct them is "map reduce job chaining" ?
>
> Thank you,
> Maxim.
>