You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tez.apache.org by Hui Zheng <hu...@yahoo-corp.jp> on 2015/01/23 06:13:50 UTC

Tez can't execute MRR jobs as a tez job when intermediate reduce has different input/output types

Hi,

We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to one Tez job,but it doesn't work when the intermediate reduce has different input/output types. Please see the details below.

Suppose that we have two mapreduce jobs to implement the ordered-wordcount job which count the number of occurrences of word and sort them.

Job1 is a traditional wordcount job except the output is <counts,word> pair. We call the mapper "Mapper1" and call the reducer "Reducer1".

Job2 sort the word by the number of occurrences.We call the mapper "Mapper2" which has no any logic and call the reducer "Reducer2".

By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)--> Mapper2 --(shuffle)--> Reducer2

By "MultiStageMRConfigUtil" we want convert it to TEZ job such as: Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2

Here Reducer1 is the intermediate reduce and it's input type is <IntWritable,Text> but the output is <Text,IntWritable>.

Because the following error happened it didn't work.

 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630 failed with state FAILED due to: Vertex failed, vertexName=ivertex1, vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed, taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt 0 failed, info=[Error: Failure while running task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.io.Text

        at org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.reduce(ConfigableWordCount.java:71)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(ReduceProcessor.java:331)
        at org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:143)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
        at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
        at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)


I found the YARNRunner Class uses stageConfs[i-1] to determine the Reducer1's input when it creates the edge of DAG while the ReduceProcessor Class uses  stageConfs[i] to determine his(Reducer1) input.

But in fact the setting of stageConfs[i] is the Reducer1's output, not its input. ReduceProcessor should have used stageConfs[i-1]'s setting as YARNRunner does. ( In this case 'i' is 1)

-------------------------------------------------------------------

//in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java

for (int i = 0; i < stageConfs.length; i++) {

    //  use stageConfs[i] to create vertex(in our case it is a ReduceProcessor)
    //  then the ReduceProcessor is created and input is determined also by stageConfs[i]
    //  Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);  //it will be TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java
    //  Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);  //it will be TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java

    vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
        i == 0 ? mapInputLocations : reduceInputLocations, i,stageConfs.length);
}

...
// use stageConfs[i-1] to create edge and its input which should be the same as reduce's input
// but the reduce's input uses stageConfs[i] as above so they are maybe incompatible.
OrderedPartitionedKVEdgeConfig edgeConf =
    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
    stageConfs[i - 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
    MRPartitioner.class.getName(), partitionerConf)
    .configureInput().useLegacyInput().done()
    .setFromConfiguration(stageConfs[i - 1]).build();
Edge edge = Edge.create(vertices[i - 1], vertices[i], edgeConf.createDefaultEdgeProperty());
dag.addEdge(edge);

-------------------------------------------------------------------

In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two settings to let ReduceProcessor read. Then it does work well(But I think the best way is to let ReduceProcessor read stageConfs[i-1]).

-------------------------------------------------------------------

//"mapreduce.reduce.input.key.class" and "mapreduce.reduce.input.value.class" are the new settings added by us.
diff src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ReduceProcessor.java.OLD
112,113c112,113
<     Class keyClass = jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
<     Class valueClass = jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
---
>     Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
>     Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);

-------------------------------------------------------------------


Thanks

- Hui

Re: Tez can't execute MRR jobs as a tez job when intermediate reduce has different input/output types

Posted by Siddharth Seth <ss...@apache.org>.
Using MultiStageMRConfigUtil and per-stage configs is not recommended.
https://issues.apache.org/jira/browse/TEZ-1271 is meant to remove support
for this.

That said, we have had informal discussions about supporting chains of
Map-Reduce jobs, and potentially providing a tool to convert such chains
into DAGs. Clearly, there are scenarios where people would be interested in
doing this. Would be great to get more input on this from members of the
tez community.

Thanks
- Sid

On Sun, Jan 25, 2015 at 6:50 PM, Hui Zheng <hu...@yahoo-corp.jp> wrote:

> Hi,
>
> Actually we have already many mapreduce jobs which are running in
> production environment.
> And we want to find a easy way to change MRR jobs to a tez job to avoid
> reading and writing hdfs.
> So I find that we may only change the configuration(not use
> MultiStageMRConfigUtil class directly ) to implement it.Is it obsolete or
> not recommended?
> (we use tez-0.5.2)
>
> The way is use the“mrr.intermediate.num-stages”property for a job and use
> the“mrr.intermediate.stage.STAGE_NUM”prefix for each intermediate reduce
> of the job such as follows.
>
> <property>
>       <name>mrr.intermediate.num-stages</name>
>      <value>1</value>
>      </property>
>
> <property>
>      <name>mrr.intermediate.stage.1.mapreduce.job.reduce.class</name>
>
> <value>org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReduc
> er</value>
>      </property>
>
>      <property>
>      <name>mrr.intermediate.stage.1.mapreduce.map.output.key.class</name>
>      <value>org.apache.hadoop.io.IntWritable</value>
>      </property>
>
>       <property>
>      <name>mrr.intermediate.stage.1.mapreduce.map.output.value.class</name>
>      <value>org.apache.hadoop.io.Text</value>
>      </property>
>
> Thanks
>
>
> -Hui
>
>
>
> On 2015/01/24 6:32, "Siddharth Seth" <ss...@apache.org> wrote:
>
> >Adding to that, MultiStageMRConfigUtil is not meant to be used by external
> >projects. Support of this mechanism for multi-stage jobs is supposed to be
> >removed - noone's gotten around to doing this yet, but there's a jira open
> >to remove it.
> >
> >Using the DAG API to set this up should be possible.
> >
> >On Fri, Jan 23, 2015 at 9:20 AM, Hitesh Shah <hi...@apache.org> wrote:
> >
> >> Hello Hui,
> >>
> >> Thanks for catching and reporting this issue. Before we go about looking
> >> at a fix for this, I will provide some background.
> >>
> >> In the early days of Tez, we tried to change existing MR jobs to
> >>support a
> >> chain of MRR and ended up using Config/JobConf as a way to specify
> >> intermediate stages and also to configure them correctly. We realized
> >>that
> >> writing MRR pipelines using the JobClient API was quite unwieldy and
> >>hard
> >> to understand as it relied on setting a bunch of configs. At some
> >>point, we
> >> start cleaning up the Tez API to make it more easy to write MRR jobs in
> >>a
> >> more easy to maintain manner. In that respect, have you had a chance to
> >> look at the latest OrderedWordCount code in tez-examples? It shows you
> >>how
> >> to write an MRR job in Tez by using Tez native APIs ( DAG, Edge, Vertex
> >>)
> >> instead of messing with config properties in JobConf. This might be an
> >> easier approach if you are considering using Tez for MRR+ pipelines.
> >>
> >> In any case, for the issue that you have seen, would you mind filing a
> >> jira for this ( please mention what version of Tez you are using ) and
> >> possibly helping us by submitting up with a patch for the fix? There
> >>was a
> >> function aptly named doJobClientMagic() ( removed in recent times ),
> >>that
> >> did a second pass over the configs and setup things correctly for the
> >>case
> >> that you describe. I am not sure if removing that somehow introduced
> >>this
> >> bug.
> >>
> >> thanks
> >> ― Hitesh
> >>
> >>
> >> On Jan 22, 2015, at 9:13 PM, Hui Zheng <hu...@yahoo-corp.jp> wrote:
> >>
> >> > Hi,
> >> >
> >> > We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to
> >> one Tez job,but it doesn't work when the intermediate reduce has
> >>different
> >> input/output types. Please see the details below.
> >> >
> >> > Suppose that we have two mapreduce jobs to implement the
> >> ordered-wordcount job which count the number of occurrences of word and
> >> sort them.
> >> >
> >> > Job1 is a traditional wordcount job except the output is <counts,word>
> >> pair. We call the mapper "Mapper1" and call the reducer "Reducer1".
> >> >
> >> > Job2 sort the word by the number of occurrences.We call the mapper
> >> "Mapper2" which has no any logic and call the reducer "Reducer2".
> >> >
> >> > By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)-->
> >> Mapper2 --(shuffle)--> Reducer2
> >> >
> >> > By "MultiStageMRConfigUtil" we want convert it to TEZ job such as:
> >> Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2
> >> >
> >> > Here Reducer1 is the intermediate reduce and it's input type is
> >> <IntWritable,Text> but the output is <Text,IntWritable>.
> >> >
> >> > Because the following error happened it didn't work.
> >> >
> >> > 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630
> >> failed with state FAILED due to: Vertex failed, vertexName=ivertex1,
> >> vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed,
> >> taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt
> >>0
> >> failed, info=[Error: Failure while running
> >> task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable
> >>cannot
> >> be cast to org.apache.hadoop.io.Text
> >> >
> >> >        at
> >>
> >>org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.re
> >>duce(ConfigableWordCount.java:71)
> >> >        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
> >> >        at
> >>
> >>org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(R
> >>educeProcessor.java:331)
> >> >        at
> >>
> >>org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProce
> >>ssor.java:143)
> >> >        at
> >>
> >>org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcess
> >>orRuntimeTask.java:324)
> >> >        at
> >>
> >>org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTas
> >>kRunner.java:176)
> >> >        at
> >>
> >>org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTas
> >>kRunner.java:168)
> >> >        at java.security.AccessController.doPrivileged(Native Method)
> >> >        at javax.security.auth.Subject.doAs(Subject.java:415)
> >> >
> >> >
> >> > I found the YARNRunner Class uses stageConfs[i-1] to determine the
> >> Reducer1's input when it creates the edge of DAG while the
> >>ReduceProcessor
> >> Class uses  stageConfs[i] to determine his(Reducer1) input.
> >> >
> >> > But in fact the setting of stageConfs[i] is the Reducer1's output, not
> >> its input. ReduceProcessor should have used stageConfs[i-1]'s setting as
> >> YARNRunner does. ( In this case 'i' is 1)
> >> >
> >> > -------------------------------------------------------------------
> >> >
> >> > //in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java
> >> >
> >> > for (int i = 0; i < stageConfs.length; i++) {
> >> >
> >> >    //  use stageConfs[i] to create vertex(in our case it is a
> >> ReduceProcessor)
> >> >    //  then the ReduceProcessor is created and input is determined
> >>also
> >> by stageConfs[i]
> >> >    //  Class keyClass =
> >> ConfigUtils.getIntermediateInputKeyClass(jobConf);  //it will be
> >> TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java
> >> >    //  Class valueClass =
> >> ConfigUtils.getIntermediateInputValueClass(jobConf);  //it will be
> >> TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java
> >> >
> >> >    vertices[i] = createVertexForStage(stageConfs[i],
> >>jobLocalResources,
> >> >        i == 0 ? mapInputLocations : reduceInputLocations,
> >> i,stageConfs.length);
> >> > }
> >> >
> >> > ...
> >> > // use stageConfs[i-1] to create edge and its input which should be
> >>the
> >> same as reduce's input
> >> > // but the reduce's input uses stageConfs[i] as above so they are
> >>maybe
> >> incompatible.
> >> > OrderedPartitionedKVEdgeConfig edgeConf =
> >> >    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
> >> >        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
> >> >    stageConfs[i -
> >> 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
> >> >    MRPartitioner.class.getName(), partitionerConf)
> >> >    .configureInput().useLegacyInput().done()
> >> >    .setFromConfiguration(stageConfs[i - 1]).build();
> >> > Edge edge = Edge.create(vertices[i - 1], vertices[i],
> >> edgeConf.createDefaultEdgeProperty());
> >> > dag.addEdge(edge);
> >> >
> >> > -------------------------------------------------------------------
> >> >
> >> > In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two
> >> settings to let ReduceProcessor read. Then it does work well(But I think
> >> the best way is to let ReduceProcessor read stageConfs[i-1]).
> >> >
> >> > -------------------------------------------------------------------
> >> >
> >> > //"mapreduce.reduce.input.key.class" and
> >> "mapreduce.reduce.input.value.class" are the new settings added by us.
> >> > diff
> >>
> >>src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.j
> >>ava
> >> ReduceProcessor.java.OLD
> >> > 112,113c112,113
> >> > <     Class keyClass =
> >> jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
> >> > <     Class valueClass =
> >>
> >>jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
> >> > ---
> >> >>    Class keyClass =
> >>ConfigUtils.getIntermediateInputKeyClass(jobConf);
> >> >>    Class valueClass =
> >> ConfigUtils.getIntermediateInputValueClass(jobConf);
> >> >
> >> > -------------------------------------------------------------------
> >> >
> >> >
> >> > Thanks
> >> >
> >> > - Hui
> >>
> >>
>
>

Re: Tez can't execute MRR jobs as a tez job when intermediate reduce has different input/output types

Posted by Hui Zheng <hu...@yahoo-corp.jp>.
Hi,

Actually we have already many mapreduce jobs which are running in
production environment.
And we want to find a easy way to change MRR jobs to a tez job to avoid
reading and writing hdfs.
So I find that we may only change the configuration(not use
MultiStageMRConfigUtil class directly ) to implement it.Is it obsolete or
not recommended?
(we use tez-0.5.2)

The way is use the“mrr.intermediate.num-stages”property for a job and use
the“mrr.intermediate.stage.STAGE_NUM”prefix for each intermediate reduce
of the job such as follows.

<property>
      <name>mrr.intermediate.num-stages</name>
     <value>1</value>
     </property>

<property>
     <name>mrr.intermediate.stage.1.mapreduce.job.reduce.class</name>
     
<value>org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReduc
er</value>
     </property>

     <property>
     <name>mrr.intermediate.stage.1.mapreduce.map.output.key.class</name>
     <value>org.apache.hadoop.io.IntWritable</value>
     </property>

      <property>
     <name>mrr.intermediate.stage.1.mapreduce.map.output.value.class</name>
     <value>org.apache.hadoop.io.Text</value>
     </property>

Thanks


-Hui



On 2015/01/24 6:32, "Siddharth Seth" <ss...@apache.org> wrote:

>Adding to that, MultiStageMRConfigUtil is not meant to be used by external
>projects. Support of this mechanism for multi-stage jobs is supposed to be
>removed - noone's gotten around to doing this yet, but there's a jira open
>to remove it.
>
>Using the DAG API to set this up should be possible.
>
>On Fri, Jan 23, 2015 at 9:20 AM, Hitesh Shah <hi...@apache.org> wrote:
>
>> Hello Hui,
>>
>> Thanks for catching and reporting this issue. Before we go about looking
>> at a fix for this, I will provide some background.
>>
>> In the early days of Tez, we tried to change existing MR jobs to
>>support a
>> chain of MRR and ended up using Config/JobConf as a way to specify
>> intermediate stages and also to configure them correctly. We realized
>>that
>> writing MRR pipelines using the JobClient API was quite unwieldy and
>>hard
>> to understand as it relied on setting a bunch of configs. At some
>>point, we
>> start cleaning up the Tez API to make it more easy to write MRR jobs in
>>a
>> more easy to maintain manner. In that respect, have you had a chance to
>> look at the latest OrderedWordCount code in tez-examples? It shows you
>>how
>> to write an MRR job in Tez by using Tez native APIs ( DAG, Edge, Vertex
>>)
>> instead of messing with config properties in JobConf. This might be an
>> easier approach if you are considering using Tez for MRR+ pipelines.
>>
>> In any case, for the issue that you have seen, would you mind filing a
>> jira for this ( please mention what version of Tez you are using ) and
>> possibly helping us by submitting up with a patch for the fix? There
>>was a
>> function aptly named doJobClientMagic() ( removed in recent times ),
>>that
>> did a second pass over the configs and setup things correctly for the
>>case
>> that you describe. I am not sure if removing that somehow introduced
>>this
>> bug.
>>
>> thanks
>> ― Hitesh
>>
>>
>> On Jan 22, 2015, at 9:13 PM, Hui Zheng <hu...@yahoo-corp.jp> wrote:
>>
>> > Hi,
>> >
>> > We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to
>> one Tez job,but it doesn't work when the intermediate reduce has
>>different
>> input/output types. Please see the details below.
>> >
>> > Suppose that we have two mapreduce jobs to implement the
>> ordered-wordcount job which count the number of occurrences of word and
>> sort them.
>> >
>> > Job1 is a traditional wordcount job except the output is <counts,word>
>> pair. We call the mapper "Mapper1" and call the reducer "Reducer1".
>> >
>> > Job2 sort the word by the number of occurrences.We call the mapper
>> "Mapper2" which has no any logic and call the reducer "Reducer2".
>> >
>> > By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)-->
>> Mapper2 --(shuffle)--> Reducer2
>> >
>> > By "MultiStageMRConfigUtil" we want convert it to TEZ job such as:
>> Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2
>> >
>> > Here Reducer1 is the intermediate reduce and it's input type is
>> <IntWritable,Text> but the output is <Text,IntWritable>.
>> >
>> > Because the following error happened it didn't work.
>> >
>> > 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630
>> failed with state FAILED due to: Vertex failed, vertexName=ivertex1,
>> vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed,
>> taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt
>>0
>> failed, info=[Error: Failure while running
>> task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable
>>cannot
>> be cast to org.apache.hadoop.io.Text
>> >
>> >        at
>> 
>>org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.re
>>duce(ConfigableWordCount.java:71)
>> >        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
>> >        at
>> 
>>org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(R
>>educeProcessor.java:331)
>> >        at
>> 
>>org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProce
>>ssor.java:143)
>> >        at
>> 
>>org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcess
>>orRuntimeTask.java:324)
>> >        at
>> 
>>org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTas
>>kRunner.java:176)
>> >        at
>> 
>>org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTas
>>kRunner.java:168)
>> >        at java.security.AccessController.doPrivileged(Native Method)
>> >        at javax.security.auth.Subject.doAs(Subject.java:415)
>> >
>> >
>> > I found the YARNRunner Class uses stageConfs[i-1] to determine the
>> Reducer1's input when it creates the edge of DAG while the
>>ReduceProcessor
>> Class uses  stageConfs[i] to determine his(Reducer1) input.
>> >
>> > But in fact the setting of stageConfs[i] is the Reducer1's output, not
>> its input. ReduceProcessor should have used stageConfs[i-1]'s setting as
>> YARNRunner does. ( In this case 'i' is 1)
>> >
>> > -------------------------------------------------------------------
>> >
>> > //in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java
>> >
>> > for (int i = 0; i < stageConfs.length; i++) {
>> >
>> >    //  use stageConfs[i] to create vertex(in our case it is a
>> ReduceProcessor)
>> >    //  then the ReduceProcessor is created and input is determined
>>also
>> by stageConfs[i]
>> >    //  Class keyClass =
>> ConfigUtils.getIntermediateInputKeyClass(jobConf);  //it will be
>> TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java
>> >    //  Class valueClass =
>> ConfigUtils.getIntermediateInputValueClass(jobConf);  //it will be
>> TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java
>> >
>> >    vertices[i] = createVertexForStage(stageConfs[i],
>>jobLocalResources,
>> >        i == 0 ? mapInputLocations : reduceInputLocations,
>> i,stageConfs.length);
>> > }
>> >
>> > ...
>> > // use stageConfs[i-1] to create edge and its input which should be
>>the
>> same as reduce's input
>> > // but the reduce's input uses stageConfs[i] as above so they are
>>maybe
>> incompatible.
>> > OrderedPartitionedKVEdgeConfig edgeConf =
>> >    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
>> >        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
>> >    stageConfs[i -
>> 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
>> >    MRPartitioner.class.getName(), partitionerConf)
>> >    .configureInput().useLegacyInput().done()
>> >    .setFromConfiguration(stageConfs[i - 1]).build();
>> > Edge edge = Edge.create(vertices[i - 1], vertices[i],
>> edgeConf.createDefaultEdgeProperty());
>> > dag.addEdge(edge);
>> >
>> > -------------------------------------------------------------------
>> >
>> > In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two
>> settings to let ReduceProcessor read. Then it does work well(But I think
>> the best way is to let ReduceProcessor read stageConfs[i-1]).
>> >
>> > -------------------------------------------------------------------
>> >
>> > //"mapreduce.reduce.input.key.class" and
>> "mapreduce.reduce.input.value.class" are the new settings added by us.
>> > diff
>> 
>>src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.j
>>ava
>> ReduceProcessor.java.OLD
>> > 112,113c112,113
>> > <     Class keyClass =
>> jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
>> > <     Class valueClass =
>> 
>>jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
>> > ---
>> >>    Class keyClass =
>>ConfigUtils.getIntermediateInputKeyClass(jobConf);
>> >>    Class valueClass =
>> ConfigUtils.getIntermediateInputValueClass(jobConf);
>> >
>> > -------------------------------------------------------------------
>> >
>> >
>> > Thanks
>> >
>> > - Hui
>>
>>


Re: Tez can't execute MRR jobs as a tez job when intermediate reduce has different input/output types

Posted by Siddharth Seth <ss...@apache.org>.
Adding to that, MultiStageMRConfigUtil is not meant to be used by external
projects. Support of this mechanism for multi-stage jobs is supposed to be
removed - noone's gotten around to doing this yet, but there's a jira open
to remove it.

Using the DAG API to set this up should be possible.

On Fri, Jan 23, 2015 at 9:20 AM, Hitesh Shah <hi...@apache.org> wrote:

> Hello Hui,
>
> Thanks for catching and reporting this issue. Before we go about looking
> at a fix for this, I will provide some background.
>
> In the early days of Tez, we tried to change existing MR jobs to support a
> chain of MRR and ended up using Config/JobConf as a way to specify
> intermediate stages and also to configure them correctly. We realized that
> writing MRR pipelines using the JobClient API was quite unwieldy and hard
> to understand as it relied on setting a bunch of configs. At some point, we
> start cleaning up the Tez API to make it more easy to write MRR jobs in a
> more easy to maintain manner. In that respect, have you had a chance to
> look at the latest OrderedWordCount code in tez-examples? It shows you how
> to write an MRR job in Tez by using Tez native APIs ( DAG, Edge, Vertex )
> instead of messing with config properties in JobConf. This might be an
> easier approach if you are considering using Tez for MRR+ pipelines.
>
> In any case, for the issue that you have seen, would you mind filing a
> jira for this ( please mention what version of Tez you are using ) and
> possibly helping us by submitting up with a patch for the fix? There was a
> function aptly named doJobClientMagic() ( removed in recent times ), that
> did a second pass over the configs and setup things correctly for the case
> that you describe. I am not sure if removing that somehow introduced this
> bug.
>
> thanks
> ― Hitesh
>
>
> On Jan 22, 2015, at 9:13 PM, Hui Zheng <hu...@yahoo-corp.jp> wrote:
>
> > Hi,
> >
> > We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to
> one Tez job,but it doesn't work when the intermediate reduce has different
> input/output types. Please see the details below.
> >
> > Suppose that we have two mapreduce jobs to implement the
> ordered-wordcount job which count the number of occurrences of word and
> sort them.
> >
> > Job1 is a traditional wordcount job except the output is <counts,word>
> pair. We call the mapper "Mapper1" and call the reducer "Reducer1".
> >
> > Job2 sort the word by the number of occurrences.We call the mapper
> "Mapper2" which has no any logic and call the reducer "Reducer2".
> >
> > By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)-->
> Mapper2 --(shuffle)--> Reducer2
> >
> > By "MultiStageMRConfigUtil" we want convert it to TEZ job such as:
> Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2
> >
> > Here Reducer1 is the intermediate reduce and it's input type is
> <IntWritable,Text> but the output is <Text,IntWritable>.
> >
> > Because the following error happened it didn't work.
> >
> > 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630
> failed with state FAILED due to: Vertex failed, vertexName=ivertex1,
> vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed,
> taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt 0
> failed, info=[Error: Failure while running
> task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot
> be cast to org.apache.hadoop.io.Text
> >
> >        at
> org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.reduce(ConfigableWordCount.java:71)
> >        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
> >        at
> org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(ReduceProcessor.java:331)
> >        at
> org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:143)
> >        at
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
> >        at
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
> >        at
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
> >        at java.security.AccessController.doPrivileged(Native Method)
> >        at javax.security.auth.Subject.doAs(Subject.java:415)
> >
> >
> > I found the YARNRunner Class uses stageConfs[i-1] to determine the
> Reducer1's input when it creates the edge of DAG while the ReduceProcessor
> Class uses  stageConfs[i] to determine his(Reducer1) input.
> >
> > But in fact the setting of stageConfs[i] is the Reducer1's output, not
> its input. ReduceProcessor should have used stageConfs[i-1]'s setting as
> YARNRunner does. ( In this case 'i' is 1)
> >
> > -------------------------------------------------------------------
> >
> > //in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java
> >
> > for (int i = 0; i < stageConfs.length; i++) {
> >
> >    //  use stageConfs[i] to create vertex(in our case it is a
> ReduceProcessor)
> >    //  then the ReduceProcessor is created and input is determined also
> by stageConfs[i]
> >    //  Class keyClass =
> ConfigUtils.getIntermediateInputKeyClass(jobConf);  //it will be
> TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java
> >    //  Class valueClass =
> ConfigUtils.getIntermediateInputValueClass(jobConf);  //it will be
> TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java
> >
> >    vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
> >        i == 0 ? mapInputLocations : reduceInputLocations,
> i,stageConfs.length);
> > }
> >
> > ...
> > // use stageConfs[i-1] to create edge and its input which should be the
> same as reduce's input
> > // but the reduce's input uses stageConfs[i] as above so they are maybe
> incompatible.
> > OrderedPartitionedKVEdgeConfig edgeConf =
> >    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
> >        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
> >    stageConfs[i -
> 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
> >    MRPartitioner.class.getName(), partitionerConf)
> >    .configureInput().useLegacyInput().done()
> >    .setFromConfiguration(stageConfs[i - 1]).build();
> > Edge edge = Edge.create(vertices[i - 1], vertices[i],
> edgeConf.createDefaultEdgeProperty());
> > dag.addEdge(edge);
> >
> > -------------------------------------------------------------------
> >
> > In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two
> settings to let ReduceProcessor read. Then it does work well(But I think
> the best way is to let ReduceProcessor read stageConfs[i-1]).
> >
> > -------------------------------------------------------------------
> >
> > //"mapreduce.reduce.input.key.class" and
> "mapreduce.reduce.input.value.class" are the new settings added by us.
> > diff
> src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
> ReduceProcessor.java.OLD
> > 112,113c112,113
> > <     Class keyClass =
> jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
> > <     Class valueClass =
> jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
> > ---
> >>    Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
> >>    Class valueClass =
> ConfigUtils.getIntermediateInputValueClass(jobConf);
> >
> > -------------------------------------------------------------------
> >
> >
> > Thanks
> >
> > - Hui
>
>

Re: Tez can't execute MRR jobs as a tez job when intermediate reduce has different input/output types

Posted by Hitesh Shah <hi...@apache.org>.
Hello Hui, 

Thanks for catching and reporting this issue. Before we go about looking at a fix for this, I will provide some background.

In the early days of Tez, we tried to change existing MR jobs to support a chain of MRR and ended up using Config/JobConf as a way to specify intermediate stages and also to configure them correctly. We realized that writing MRR pipelines using the JobClient API was quite unwieldy and hard to understand as it relied on setting a bunch of configs. At some point, we start cleaning up the Tez API to make it more easy to write MRR jobs in a more easy to maintain manner. In that respect, have you had a chance to look at the latest OrderedWordCount code in tez-examples? It shows you how to write an MRR job in Tez by using Tez native APIs ( DAG, Edge, Vertex ) instead of messing with config properties in JobConf. This might be an easier approach if you are considering using Tez for MRR+ pipelines.

In any case, for the issue that you have seen, would you mind filing a jira for this ( please mention what version of Tez you are using ) and possibly helping us by submitting up with a patch for the fix? There was a function aptly named doJobClientMagic() ( removed in recent times ), that did a second pass over the configs and setup things correctly for the case that you describe. I am not sure if removing that somehow introduced this bug.

thanks
― Hitesh 


On Jan 22, 2015, at 9:13 PM, Hui Zheng <hu...@yahoo-corp.jp> wrote:

> Hi,
> 
> We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to one Tez job,but it doesn't work when the intermediate reduce has different input/output types. Please see the details below.
> 
> Suppose that we have two mapreduce jobs to implement the ordered-wordcount job which count the number of occurrences of word and sort them.
> 
> Job1 is a traditional wordcount job except the output is <counts,word> pair. We call the mapper "Mapper1" and call the reducer "Reducer1".
> 
> Job2 sort the word by the number of occurrences.We call the mapper "Mapper2" which has no any logic and call the reducer "Reducer2".
> 
> By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)--> Mapper2 --(shuffle)--> Reducer2
> 
> By "MultiStageMRConfigUtil" we want convert it to TEZ job such as: Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2
> 
> Here Reducer1 is the intermediate reduce and it's input type is <IntWritable,Text> but the output is <Text,IntWritable>.
> 
> Because the following error happened it didn't work.
> 
> 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630 failed with state FAILED due to: Vertex failed, vertexName=ivertex1, vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed, taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt 0 failed, info=[Error: Failure while running task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.io.Text
> 
>        at org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.reduce(ConfigableWordCount.java:71)
>        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
>        at org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(ReduceProcessor.java:331)
>        at org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:143)
>        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
>        at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
>        at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:415)
> 
> 
> I found the YARNRunner Class uses stageConfs[i-1] to determine the Reducer1's input when it creates the edge of DAG while the ReduceProcessor Class uses  stageConfs[i] to determine his(Reducer1) input.
> 
> But in fact the setting of stageConfs[i] is the Reducer1's output, not its input. ReduceProcessor should have used stageConfs[i-1]'s setting as YARNRunner does. ( In this case 'i' is 1)
> 
> -------------------------------------------------------------------
> 
> //in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java
> 
> for (int i = 0; i < stageConfs.length; i++) {
> 
>    //  use stageConfs[i] to create vertex(in our case it is a ReduceProcessor)
>    //  then the ReduceProcessor is created and input is determined also by stageConfs[i]
>    //  Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);  //it will be TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java
>    //  Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);  //it will be TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java
> 
>    vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
>        i == 0 ? mapInputLocations : reduceInputLocations, i,stageConfs.length);
> }
> 
> ...
> // use stageConfs[i-1] to create edge and its input which should be the same as reduce's input
> // but the reduce's input uses stageConfs[i] as above so they are maybe incompatible.
> OrderedPartitionedKVEdgeConfig edgeConf =
>    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
>        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
>    stageConfs[i - 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
>    MRPartitioner.class.getName(), partitionerConf)
>    .configureInput().useLegacyInput().done()
>    .setFromConfiguration(stageConfs[i - 1]).build();
> Edge edge = Edge.create(vertices[i - 1], vertices[i], edgeConf.createDefaultEdgeProperty());
> dag.addEdge(edge);
> 
> -------------------------------------------------------------------
> 
> In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two settings to let ReduceProcessor read. Then it does work well(But I think the best way is to let ReduceProcessor read stageConfs[i-1]).
> 
> -------------------------------------------------------------------
> 
> //"mapreduce.reduce.input.key.class" and "mapreduce.reduce.input.value.class" are the new settings added by us.
> diff src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ReduceProcessor.java.OLD
> 112,113c112,113
> <     Class keyClass = jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
> <     Class valueClass = jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
> ---
>>    Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
>>    Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);
> 
> -------------------------------------------------------------------
> 
> 
> Thanks
> 
> - Hui