You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tez.apache.org by Jeff Hurt <Je...@clickfox.com> on 2014/06/16 19:15:09 UTC

Tez DAG question

All,

I have a scenario where I have three Map-Reduce jobs, and I would like to build this as a Tez DAG.  The basic design is that the first Map-Reduce job (M1,R1) should invoke both the second Map-Reduce job (M2,R2) and the third Map (M3) job.  In addition, each job should write out its results to a file in HDFS.

Graphically, a layout would look like this:

          M1
          |
          R1    (R1 writes output to HDFS)
          |
      M2 --- M3  (M3 has no reducer, writes output to HDFS)
      |
      R2  (R2 writes output to HDFS)

The results of R1 would be written out to HDFS, and would also be used as the inputs to both M2 and M3.

But, we have not been able to get this functionality to work.  Errors show up whenever our DAG contains more than just the first Map-Reduce job.

Here is the pseudocode:

        final byte[] map1Payload = MRHelpers.createUserPayloadFromConf(map1Conf);
        final byte[] map1InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
            Text.class.getName());

        final Vertex map1Vertex = new Vertex("M1",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload), -1,
            MRHelpers.getMapResource(map1Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));

        final Map<String, String> map1Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
        map1Vertex.setTaskEnvironment(map1Env);

        final Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
        MRHelpers.addMRInput(map1Vertex, map1InputPayload, initializerClazz);

        final byte[] reduce1Payload = MRHelpers.createUserPayloadFromConf(reduce1Conf);
        final Vertex reduce1Vertex = new Vertex("R1",
            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload), 1,
            MRHelpers.getReduceResource(reduce1Conf));
        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));

        final Map<String, String> reduce1Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env, false);
        reduceVertex.setTaskEnvironment(reduce1Env);

        MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);

        final byte[] map2Payload = MRHelpers.createUserPayloadFromConf(map2Conf);
        final byte[] map2InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
            Text.class.getName());

        final Vertex map2Vertex = new Vertex("M2",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload), -1,
            MRHelpers.getMapResource(map2Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));

        final Map<String, String> map2Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
        map2Vertex.setTaskEnvironment(map2Env);

        final byte[] reduce2Payload = MRHelpers.createUserPayloadFromConf(reduce2Conf);
        final Vertex reduce2Vertex = new Vertex("R2",
            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload), 1,
            MRHelpers.getReduceResource(reduce2Conf));
        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));

        final Map<String, String> reduce2Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env, false);
        reduceVertex.setTaskEnvironment(reduce2Env);

        MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);

        final byte[] map3Payload = MRHelpers.createUserPayloadFromConf(map3Conf);
        final byte[] map3InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
            Text.class.getName());

        final Vertex map3Vertex = new Vertex("M3",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload), -1,
            MRHelpers.getMapResource(map3Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));

        final Map<String, String> map3Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
        map2Vertex.setTaskEnvironment(map3Env);
       MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);

       DAG dag = new DAG();
        dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));




(Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each reducer)

We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything works fine.  But when we add Map-Reduce 2 or Map 3; we start to get errors.

Is there a way to have multiple vertices write output to HDFS in the same DAG?  Are there code examples of doing this?

FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.

Thanks in advance,
Jeff Hurt

RE: Tez DAG question

Posted by Jeff Hurt <Je...@clickfox.com>.
Hi Hitesh,

I'll answer your questions "inline" [Jeff Hurt].

Thanks!
-Jeff
________________________________________
From: Hitesh Shah [hitesh@apache.org]
Sent: Monday, June 16, 2014 2:04 PM
To: dev@tez.incubator.apache.org
Cc: user@tez.incubator.apache.org
Subject: Re: Tez DAG question

Hi Jeff

A few questions to clarify your use-case:
   - You seem to have clarified that you need each stage to write to HDFS but just wanted to confirm in any case whether this is a strict requirement or something just done for faster recovery in case of failures?

[Jeff Hurt] Yes, we want each stage to write out a separate file (or files) in HDFS.  The resulting files will contain different data sets and will be used for different purposes.

   - Would I be correct in saying that you want to send the data generated by R1 to both M2 and M3 and also write a replica of that data set to HDFS?

[Jeff Hurt] That is correct.

   - Does the data sent from R1 also need to be sorted and partitioned before being sent to the downstream vertices?

[Jeff Hurt] Not necessarily.  The entire result set from R1 will be iterated on in M2 and M3, to retrieve separate results.

   - How much of your logic inside the Mappers and Reducers is tied to MapReduce? If you are wiling to write your own processor instead of using a Mapper/Reducer, you will probably be able to leverage more performance benefits. For example. the logic in M2 and M3 could possibly be combined into a single vertex/processor. The single processor could write the required output to HDFS that M3 would have generated and likewise generate the required intermediate data needed by R2.

[Jeff Hurt]  That is a good idea!  We will look into a different processor, and touch base with our results.

>From a Tez point of view, the MapProcessor and ReduceProcessor were written pretty much to provide MR compatibility when strictly used in an M->R dag or M->R->R…->R ( straight-line DAGs ). They do not handle multiple outputs nor multiple inputs.

That said, I believe your use-case should be something that can be addressed in Tez. However, there are a couple of things lacking:
   - Support for something called a “shared edge”. This effectively means a vertex generating data on a given Output and that same data being sent downstream to different downstream Inputs ( edges ). Today, it is a strict 1:1 relationship
   - An edge that uses HDFS to transfer data has not been built yet. This would allow R1 to write data to HDFS and have M2 and M3 read from HDFS. In your use-case, today, one would need generate data twice - one for the shuffle edge and one for HDFS and have the shuffle edge data being sent downstream. But that would not be supported by the Map/Reduce Processors.

thanks
— Hitesh


On Jun 16, 2014, at 10:15 AM, Jeff Hurt <Je...@clickfox.com> wrote:

> All,
>
> I have a scenario where I have three Map-Reduce jobs, and I would like to build this as a Tez DAG.  The basic design is that the first Map-Reduce job (M1,R1) should invoke both the second Map-Reduce job (M2,R2) and the third Map (M3) job.  In addition, each job should write out its results to a file in HDFS.
>
> Graphically, a layout would look like this:
>
>          M1
>          |
>          R1    (R1 writes output to HDFS)
>          |
>      M2 --- M3  (M3 has no reducer, writes output to HDFS)
>      |
>      R2  (R2 writes output to HDFS)
>
> The results of R1 would be written out to HDFS, and would also be used as the inputs to both M2 and M3.
>
> But, we have not been able to get this functionality to work.  Errors show up whenever our DAG contains more than just the first Map-Reduce job.
>
> Here is the pseudocode:
>
>        final byte[] map1Payload = MRHelpers.createUserPayloadFromConf(map1Conf);
>        final byte[] map1InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
>            Text.class.getName());
>
>        final Vertex map1Vertex = new Vertex("M1",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload), -1,
>            MRHelpers.getMapResource(map1Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));
>
>        final Map<String, String> map1Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
>        map1Vertex.setTaskEnvironment(map1Env);
>
>        final Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
>        MRHelpers.addMRInput(map1Vertex, map1InputPayload, initializerClazz);
>
>        final byte[] reduce1Payload = MRHelpers.createUserPayloadFromConf(reduce1Conf);
>        final Vertex reduce1Vertex = new Vertex("R1",
>            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload), 1,
>            MRHelpers.getReduceResource(reduce1Conf));
>        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));
>
>        final Map<String, String> reduce1Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env, false);
>        reduceVertex.setTaskEnvironment(reduce1Env);
>
>        MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);
>
>        final byte[] map2Payload = MRHelpers.createUserPayloadFromConf(map2Conf);
>        final byte[] map2InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
>            Text.class.getName());
>
>        final Vertex map2Vertex = new Vertex("M2",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload), -1,
>            MRHelpers.getMapResource(map2Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));
>
>        final Map<String, String> map2Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
>        map2Vertex.setTaskEnvironment(map2Env);
>
>        final byte[] reduce2Payload = MRHelpers.createUserPayloadFromConf(reduce2Conf);
>        final Vertex reduce2Vertex = new Vertex("R2",
>            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload), 1,
>            MRHelpers.getReduceResource(reduce2Conf));
>        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));
>
>        final Map<String, String> reduce2Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env, false);
>        reduceVertex.setTaskEnvironment(reduce2Env);
>
>        MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);
>
>        final byte[] map3Payload = MRHelpers.createUserPayloadFromConf(map3Conf);
>        final byte[] map3InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
>            Text.class.getName());
>
>        final Vertex map3Vertex = new Vertex("M3",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload), -1,
>            MRHelpers.getMapResource(map3Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));
>
>        final Map<String, String> map3Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
>        map2Vertex.setTaskEnvironment(map3Env);
>       MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);
>
>       DAG dag = new DAG();
>        dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>        dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>        dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>
>
>
> (Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each reducer)
>
> We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything works fine.  But when we add Map-Reduce 2 or Map 3; we start to get errors.
>
> Is there a way to have multiple vertices write output to HDFS in the same DAG?  Are there code examples of doing this?
>
> FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.
>
> Thanks in advance,
> Jeff Hurt


Re: Tez DAG question

Posted by Hitesh Shah <hi...@apache.org>.
Hi Jeff

A few questions to clarify your use-case:
   - You seem to have clarified that you need each stage to write to HDFS but just wanted to confirm in any case whether this is a strict requirement or something just done for faster recovery in case of failures? 
   - Would I be correct in saying that you want to send the data generated by R1 to both M2 and M3 and also write a replica of that data set to HDFS? 
   - Does the data sent from R1 also need to be sorted and partitioned before being sent to the downstream vertices?
   - How much of your logic inside the Mappers and Reducers is tied to MapReduce? If you are wiling to write your own processor instead of using a Mapper/Reducer, you will probably be able to leverage more performance benefits. For example. the logic in M2 and M3 could possibly be combined into a single vertex/processor. The single processor could write the required output to HDFS that M3 would have generated and likewise generate the required intermediate data needed by R2.

From a Tez point of view, the MapProcessor and ReduceProcessor were written pretty much to provide MR compatibility when strictly used in an M->R dag or M->R->R…->R ( straight-line DAGs ). They do not handle multiple outputs nor multiple inputs.

That said, I believe your use-case should be something that can be addressed in Tez. However, there are a couple of things lacking:
   - Support for something called a “shared edge”. This effectively means a vertex generating data on a given Output and that same data being sent downstream to different downstream Inputs ( edges ). Today, it is a strict 1:1 relationship
   - An edge that uses HDFS to transfer data has not been built yet. This would allow R1 to write data to HDFS and have M2 and M3 read from HDFS. In your use-case, today, one would need generate data twice - one for the shuffle edge and one for HDFS and have the shuffle edge data being sent downstream. But that would not be supported by the Map/Reduce Processors.

thanks
— Hitesh


On Jun 16, 2014, at 10:15 AM, Jeff Hurt <Je...@clickfox.com> wrote:

> All,
> 
> I have a scenario where I have three Map-Reduce jobs, and I would like to build this as a Tez DAG.  The basic design is that the first Map-Reduce job (M1,R1) should invoke both the second Map-Reduce job (M2,R2) and the third Map (M3) job.  In addition, each job should write out its results to a file in HDFS.
> 
> Graphically, a layout would look like this:
> 
>          M1
>          |
>          R1    (R1 writes output to HDFS)
>          |
>      M2 --- M3  (M3 has no reducer, writes output to HDFS)
>      |
>      R2  (R2 writes output to HDFS)
> 
> The results of R1 would be written out to HDFS, and would also be used as the inputs to both M2 and M3.
> 
> But, we have not been able to get this functionality to work.  Errors show up whenever our DAG contains more than just the first Map-Reduce job.
> 
> Here is the pseudocode:
> 
>        final byte[] map1Payload = MRHelpers.createUserPayloadFromConf(map1Conf);
>        final byte[] map1InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
>            Text.class.getName());
> 
>        final Vertex map1Vertex = new Vertex("M1",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload), -1,
>            MRHelpers.getMapResource(map1Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));
> 
>        final Map<String, String> map1Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
>        map1Vertex.setTaskEnvironment(map1Env);
> 
>        final Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
>        MRHelpers.addMRInput(map1Vertex, map1InputPayload, initializerClazz);
> 
>        final byte[] reduce1Payload = MRHelpers.createUserPayloadFromConf(reduce1Conf);
>        final Vertex reduce1Vertex = new Vertex("R1",
>            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload), 1,
>            MRHelpers.getReduceResource(reduce1Conf));
>        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));
> 
>        final Map<String, String> reduce1Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env, false);
>        reduceVertex.setTaskEnvironment(reduce1Env);
> 
>        MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);
> 
>        final byte[] map2Payload = MRHelpers.createUserPayloadFromConf(map2Conf);
>        final byte[] map2InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
>            Text.class.getName());
> 
>        final Vertex map2Vertex = new Vertex("M2",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload), -1,
>            MRHelpers.getMapResource(map2Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));
> 
>        final Map<String, String> map2Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
>        map2Vertex.setTaskEnvironment(map2Env);
> 
>        final byte[] reduce2Payload = MRHelpers.createUserPayloadFromConf(reduce2Conf);
>        final Vertex reduce2Vertex = new Vertex("R2",
>            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload), 1,
>            MRHelpers.getReduceResource(reduce2Conf));
>        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));
> 
>        final Map<String, String> reduce2Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env, false);
>        reduceVertex.setTaskEnvironment(reduce2Env);
> 
>        MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);
> 
>        final byte[] map3Payload = MRHelpers.createUserPayloadFromConf(map3Conf);
>        final byte[] map3InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
>            Text.class.getName());
> 
>        final Vertex map3Vertex = new Vertex("M3",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload), -1,
>            MRHelpers.getMapResource(map3Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));
> 
>        final Map<String, String> map3Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
>        map2Vertex.setTaskEnvironment(map3Env);
>       MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);
> 
>       DAG dag = new DAG();
>        dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
> 
> 
> 
> (Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each reducer)
> 
> We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything works fine.  But when we add Map-Reduce 2 or Map 3; we start to get errors.
> 
> Is there a way to have multiple vertices write output to HDFS in the same DAG?  Are there code examples of doing this?
> 
> FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.
> 
> Thanks in advance,
> Jeff Hurt


RE: Tez DAG question

Posted by Jeff Hurt <Je...@clickfox.com>.
I first got this error:


2014-06-16 12:43:50,992 INFO [AsyncDispatcher event handler] org.apache.tez.dag.history.HistoryEventHandler: [HISTORY][DAG:dag_1401404900565_0722_1][Event:TASK_ATTEMPT_FINISHED]: vertexName=map2vertex, taskAttemptId=attempt_1401404900565_0722_1_02_000000_3, startTime=1402937029418, finishTime=1402937030991, timeTaken=1573, status=FAILED, diagnostics=Error: java.io.IOException: org.apache.tez.dag.api.TezException: Only MRInputLegacy supported. Input: class org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy
        at org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:111)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:307)
        at org.apache.hadoop.mapred.YarnTezDagChild$5.run(YarnTezDagChild.java:564)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
        at org.apache.hadoop.mapred.YarnTezDagChild.main(YarnTezDagChild.java:553)
Caused by: org.apache.tez.dag.api.TezException: Only MRInputLegacy supported. Input: class org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy
        ... 7 more

I assumed it was because of this:

        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

(In particular, the usage of "ShuffledMergedInputLegacy" as part of the InputDescriptor constructor.)

So, I changed this line to read:

        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(MRInputLegacy.class.getName()))));


I then ran again, and got this error:


2014-06-16 13:27:51,507 INFO [AsyncDispatcher event handler] org.apache.tez.dag.history.HistoryEventHandler: [HISTORY][DAG:dag_1401404900565_0727_1][Event:TASK_ATTEMPT_FINISHED]: vertexName=map2vertex, taskAttemptId=attempt_1401404900565_0727_1_02_000000_3, startTime=1402939670074, finishTime=1402939671506, timeTaken=1432, status=FAILED, diagnostics=Error: com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type.
        at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:99)
        at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:498)
        at com.google.protobuf.GeneratedMessage.parseUnknownField(GeneratedMessage.java:193)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto.<init>(MRRuntimeProtos.java:1658)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto.<init>(MRRuntimeProtos.java:1616)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto$1.parsePartialFrom(MRRuntimeProtos.java:1717)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto$1.parsePartialFrom(MRRuntimeProtos.java:1712)
        at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto.parseFrom(MRRuntimeProtos.java:1936)
        at org.apache.tez.mapreduce.hadoop.MRHelpers.parseMRInputPayload(MRHelpers.java:726)
        at org.apache.tez.mapreduce.input.MRInput.initialize(MRInput.java:122)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask$InitializeInputCallable.call(LogicalIOProcessorRuntimeTask.java:368)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask$InitializeInputCallable.call(LogicalIOProcessorRuntimeTask.java:344)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

What should be done here?
-Jeff
________________________________
From: Vinod Kumar Vavilapalli [vinodkv@hortonworks.com]
Sent: Monday, June 16, 2014 11:30 AM
To: user@tez.incubator.apache.org
Cc: dev@tez.incubator.apache.org
Subject: Re: Tez DAG question

Can you post the exceptions/error logs?

+Vinod
Hortonworks Inc.
http://hortonworks.com/


On Mon, Jun 16, 2014 at 10:15 AM, Jeff Hurt <Je...@clickfox.com>> wrote:
All,

I have a scenario where I have three Map-Reduce jobs, and I would like to build this as a Tez DAG.  The basic design is that the first Map-Reduce job (M1,R1) should invoke both the second Map-Reduce job (M2,R2) and the third Map (M3) job.  In addition, each job should write out its results to a file in HDFS.

Graphically, a layout would look like this:

          M1
          |
          R1    (R1 writes output to HDFS)
          |
      M2 --- M3  (M3 has no reducer, writes output to HDFS)
      |
      R2  (R2 writes output to HDFS)

The results of R1 would be written out to HDFS, and would also be used as the inputs to both M2 and M3.

But, we have not been able to get this functionality to work.  Errors show up whenever our DAG contains more than just the first Map-Reduce job.

Here is the pseudocode:

        final byte[] map1Payload = MRHelpers.createUserPayloadFromConf(map1Conf);
        final byte[] map1InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
            Text.class.getName());

        final Vertex map1Vertex = new Vertex("M1",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload), -1,
            MRHelpers.getMapResource(map1Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));

        final Map<String, String> map1Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
        map1Vertex.setTaskEnvironment(map1Env);

        final Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
        MRHelpers.addMRInput(map1Vertex, map1InputPayload, initializerClazz);

        final byte[] reduce1Payload = MRHelpers.createUserPayloadFromConf(reduce1Conf);
        final Vertex reduce1Vertex = new Vertex("R1",
            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload), 1,
            MRHelpers.getReduceResource(reduce1Conf));
        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));

        final Map<String, String> reduce1Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env, false);
        reduceVertex.setTaskEnvironment(reduce1Env);

        MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);

        final byte[] map2Payload = MRHelpers.createUserPayloadFromConf(map2Conf);
        final byte[] map2InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
            Text.class.getName());

        final Vertex map2Vertex = new Vertex("M2",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload), -1,
            MRHelpers.getMapResource(map2Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));

        final Map<String, String> map2Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
        map2Vertex.setTaskEnvironment(map2Env);

        final byte[] reduce2Payload = MRHelpers.createUserPayloadFromConf(reduce2Conf);
        final Vertex reduce2Vertex = new Vertex("R2",
            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload), 1,
            MRHelpers.getReduceResource(reduce2Conf));
        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));

        final Map<String, String> reduce2Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env, false);
        reduceVertex.setTaskEnvironment(reduce2Env);

        MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);

        final byte[] map3Payload = MRHelpers.createUserPayloadFromConf(map3Conf);
        final byte[] map3InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
            Text.class.getName());

        final Vertex map3Vertex = new Vertex("M3",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload), -1,
            MRHelpers.getMapResource(map3Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));

        final Map<String, String> map3Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
        map2Vertex.setTaskEnvironment(map3Env);
       MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);

       DAG dag = new DAG();
        dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));




(Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each reducer)

We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything works fine.  But when we add Map-Reduce 2 or Map 3; we start to get errors.

Is there a way to have multiple vertices write output to HDFS in the same DAG?  Are there code examples of doing this?

FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.

Thanks in advance,
Jeff Hurt


CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.

RE: Tez DAG question

Posted by Jeff Hurt <Je...@clickfox.com>.
I first got this error:


2014-06-16 12:43:50,992 INFO [AsyncDispatcher event handler] org.apache.tez.dag.history.HistoryEventHandler: [HISTORY][DAG:dag_1401404900565_0722_1][Event:TASK_ATTEMPT_FINISHED]: vertexName=map2vertex, taskAttemptId=attempt_1401404900565_0722_1_02_000000_3, startTime=1402937029418, finishTime=1402937030991, timeTaken=1573, status=FAILED, diagnostics=Error: java.io.IOException: org.apache.tez.dag.api.TezException: Only MRInputLegacy supported. Input: class org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy
        at org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:111)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:307)
        at org.apache.hadoop.mapred.YarnTezDagChild$5.run(YarnTezDagChild.java:564)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
        at org.apache.hadoop.mapred.YarnTezDagChild.main(YarnTezDagChild.java:553)
Caused by: org.apache.tez.dag.api.TezException: Only MRInputLegacy supported. Input: class org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy
        ... 7 more

I assumed it was because of this:

        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

(In particular, the usage of "ShuffledMergedInputLegacy" as part of the InputDescriptor constructor.)

So, I changed this line to read:

        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(MRInputLegacy.class.getName()))));


I then ran again, and got this error:


2014-06-16 13:27:51,507 INFO [AsyncDispatcher event handler] org.apache.tez.dag.history.HistoryEventHandler: [HISTORY][DAG:dag_1401404900565_0727_1][Event:TASK_ATTEMPT_FINISHED]: vertexName=map2vertex, taskAttemptId=attempt_1401404900565_0727_1_02_000000_3, startTime=1402939670074, finishTime=1402939671506, timeTaken=1432, status=FAILED, diagnostics=Error: com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type.
        at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:99)
        at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:498)
        at com.google.protobuf.GeneratedMessage.parseUnknownField(GeneratedMessage.java:193)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto.<init>(MRRuntimeProtos.java:1658)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto.<init>(MRRuntimeProtos.java:1616)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto$1.parsePartialFrom(MRRuntimeProtos.java:1717)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto$1.parsePartialFrom(MRRuntimeProtos.java:1712)
        at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto.parseFrom(MRRuntimeProtos.java:1936)
        at org.apache.tez.mapreduce.hadoop.MRHelpers.parseMRInputPayload(MRHelpers.java:726)
        at org.apache.tez.mapreduce.input.MRInput.initialize(MRInput.java:122)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask$InitializeInputCallable.call(LogicalIOProcessorRuntimeTask.java:368)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask$InitializeInputCallable.call(LogicalIOProcessorRuntimeTask.java:344)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

What should be done here?
-Jeff
________________________________
From: Vinod Kumar Vavilapalli [vinodkv@hortonworks.com]
Sent: Monday, June 16, 2014 11:30 AM
To: user@tez.incubator.apache.org
Cc: dev@tez.incubator.apache.org
Subject: Re: Tez DAG question

Can you post the exceptions/error logs?

+Vinod
Hortonworks Inc.
http://hortonworks.com/


On Mon, Jun 16, 2014 at 10:15 AM, Jeff Hurt <Je...@clickfox.com>> wrote:
All,

I have a scenario where I have three Map-Reduce jobs, and I would like to build this as a Tez DAG.  The basic design is that the first Map-Reduce job (M1,R1) should invoke both the second Map-Reduce job (M2,R2) and the third Map (M3) job.  In addition, each job should write out its results to a file in HDFS.

Graphically, a layout would look like this:

          M1
          |
          R1    (R1 writes output to HDFS)
          |
      M2 --- M3  (M3 has no reducer, writes output to HDFS)
      |
      R2  (R2 writes output to HDFS)

The results of R1 would be written out to HDFS, and would also be used as the inputs to both M2 and M3.

But, we have not been able to get this functionality to work.  Errors show up whenever our DAG contains more than just the first Map-Reduce job.

Here is the pseudocode:

        final byte[] map1Payload = MRHelpers.createUserPayloadFromConf(map1Conf);
        final byte[] map1InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
            Text.class.getName());

        final Vertex map1Vertex = new Vertex("M1",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload), -1,
            MRHelpers.getMapResource(map1Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));

        final Map<String, String> map1Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
        map1Vertex.setTaskEnvironment(map1Env);

        final Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
        MRHelpers.addMRInput(map1Vertex, map1InputPayload, initializerClazz);

        final byte[] reduce1Payload = MRHelpers.createUserPayloadFromConf(reduce1Conf);
        final Vertex reduce1Vertex = new Vertex("R1",
            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload), 1,
            MRHelpers.getReduceResource(reduce1Conf));
        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));

        final Map<String, String> reduce1Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env, false);
        reduceVertex.setTaskEnvironment(reduce1Env);

        MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);

        final byte[] map2Payload = MRHelpers.createUserPayloadFromConf(map2Conf);
        final byte[] map2InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
            Text.class.getName());

        final Vertex map2Vertex = new Vertex("M2",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload), -1,
            MRHelpers.getMapResource(map2Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));

        final Map<String, String> map2Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
        map2Vertex.setTaskEnvironment(map2Env);

        final byte[] reduce2Payload = MRHelpers.createUserPayloadFromConf(reduce2Conf);
        final Vertex reduce2Vertex = new Vertex("R2",
            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload), 1,
            MRHelpers.getReduceResource(reduce2Conf));
        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));

        final Map<String, String> reduce2Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env, false);
        reduceVertex.setTaskEnvironment(reduce2Env);

        MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);

        final byte[] map3Payload = MRHelpers.createUserPayloadFromConf(map3Conf);
        final byte[] map3InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
            Text.class.getName());

        final Vertex map3Vertex = new Vertex("M3",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload), -1,
            MRHelpers.getMapResource(map3Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));

        final Map<String, String> map3Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
        map2Vertex.setTaskEnvironment(map3Env);
       MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);

       DAG dag = new DAG();
        dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));




(Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each reducer)

We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything works fine.  But when we add Map-Reduce 2 or Map 3; we start to get errors.

Is there a way to have multiple vertices write output to HDFS in the same DAG?  Are there code examples of doing this?

FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.

Thanks in advance,
Jeff Hurt


CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.

Re: Tez DAG question

Posted by Vinod Kumar Vavilapalli <vi...@hortonworks.com>.
Can you post the exceptions/error logs?

+Vinod
Hortonworks Inc.
http://hortonworks.com/


On Mon, Jun 16, 2014 at 10:15 AM, Jeff Hurt <Je...@clickfox.com> wrote:

>  All,
>
> I have a scenario where I have three Map-Reduce jobs, and I would like to
> build this as a Tez DAG.  The basic design is that the first Map-Reduce job
> (M1,R1) should invoke both the second Map-Reduce job (M2,R2) and the third
> Map (M3) job.  In addition, each job should write out its results to a file
> in HDFS.
>
> Graphically, a layout would look like this:
>
>           M1
>           |
>           R1    (R1 writes output to HDFS)
>           |
>       M2 --- M3  (M3 has no reducer, writes output to HDFS)
>       |
>       R2  (R2 writes output to HDFS)
>
> The results of R1 would be written out to HDFS, and would also be used as
> the inputs to both M2 and M3.
>
> But, we have not been able to get this functionality to work.  Errors show
> up whenever our DAG contains more than just the first Map-Reduce job.
>
> Here is the pseudocode:
>
>         final byte[] map1Payload =
> MRHelpers.createUserPayloadFromConf(map1Conf);
>         final byte[] map1InputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
>             Text.class.getName());
>
>         final Vertex map1Vertex = new Vertex("M1",
>             new
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload),
> -1,
>             MRHelpers.getMapResource(map1Conf));
>         mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));
>
>         final Map<String, String> map1Env = new HashMap<String, String>();
>         MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
>         map1Vertex.setTaskEnvironment(map1Env);
>
>         final Class<? extends TezRootInputInitializer> initializerClazz =
> MRInputAMSplitGenerator.class;
>         MRHelpers.addMRInput(map1Vertex, map1InputPayload,
> initializerClazz);
>
>         final byte[] reduce1Payload =
> MRHelpers.createUserPayloadFromConf(reduce1Conf);
>         final Vertex reduce1Vertex = new Vertex("R1",
>             new
> ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload),
> 1,
>             MRHelpers.getReduceResource(reduce1Conf));
>
> reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));
>
>         final Map<String, String> reduce1Env = new HashMap<String,
> String>();
>         MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env,
> false);
>         reduceVertex.setTaskEnvironment(reduce1Env);
>
>         MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);
>
>         final byte[] map2Payload =
> MRHelpers.createUserPayloadFromConf(map2Conf);
>         final byte[] map2InputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
>             Text.class.getName());
>
>         final Vertex map2Vertex = new Vertex("M2",
>             new
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload),
> -1,
>             MRHelpers.getMapResource(map2Conf));
>         mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));
>
>         final Map<String, String> map2Env = new HashMap<String, String>();
>         MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
>         map2Vertex.setTaskEnvironment(map2Env);
>
>         final byte[] reduce2Payload =
> MRHelpers.createUserPayloadFromConf(reduce2Conf);
>         final Vertex reduce2Vertex = new Vertex("R2",
>             new
> ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload),
> 1,
>             MRHelpers.getReduceResource(reduce2Conf));
>
> reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));
>
>         final Map<String, String> reduce2Env = new HashMap<String,
> String>();
>         MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env,
> false);
>         reduceVertex.setTaskEnvironment(reduce2Env);
>
>         MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);
>
>         final byte[] map3Payload =
> MRHelpers.createUserPayloadFromConf(map3Conf);
>         final byte[] map3InputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
>             Text.class.getName());
>
>         final Vertex map3Vertex = new Vertex("M3",
>             new
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload),
> -1,
>             MRHelpers.getMapResource(map3Conf));
>         mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));
>
>         final Map<String, String> map3Env = new HashMap<String, String>();
>         MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
>         map2Vertex.setTaskEnvironment(map3Env);
>        MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);
>
>        DAG dag = new DAG();
>         dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>         dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>         dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>         dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>
>
>
> (Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each
> reducer)
>
> We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything
> works fine.  But when we add Map-Reduce 2 or Map 3; we start to get errors.
>
> Is there a way to have multiple vertices write output to HDFS in the same
> DAG?  Are there code examples of doing this?
>
> FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.
>
> Thanks in advance,
> Jeff Hurt
>

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

Re: Tez DAG question

Posted by Hitesh Shah <hi...@apache.org>.
Hi Jeff

A few questions to clarify your use-case:
   - You seem to have clarified that you need each stage to write to HDFS but just wanted to confirm in any case whether this is a strict requirement or something just done for faster recovery in case of failures? 
   - Would I be correct in saying that you want to send the data generated by R1 to both M2 and M3 and also write a replica of that data set to HDFS? 
   - Does the data sent from R1 also need to be sorted and partitioned before being sent to the downstream vertices?
   - How much of your logic inside the Mappers and Reducers is tied to MapReduce? If you are wiling to write your own processor instead of using a Mapper/Reducer, you will probably be able to leverage more performance benefits. For example. the logic in M2 and M3 could possibly be combined into a single vertex/processor. The single processor could write the required output to HDFS that M3 would have generated and likewise generate the required intermediate data needed by R2.

From a Tez point of view, the MapProcessor and ReduceProcessor were written pretty much to provide MR compatibility when strictly used in an M->R dag or M->R->R…->R ( straight-line DAGs ). They do not handle multiple outputs nor multiple inputs.

That said, I believe your use-case should be something that can be addressed in Tez. However, there are a couple of things lacking:
   - Support for something called a “shared edge”. This effectively means a vertex generating data on a given Output and that same data being sent downstream to different downstream Inputs ( edges ). Today, it is a strict 1:1 relationship
   - An edge that uses HDFS to transfer data has not been built yet. This would allow R1 to write data to HDFS and have M2 and M3 read from HDFS. In your use-case, today, one would need generate data twice - one for the shuffle edge and one for HDFS and have the shuffle edge data being sent downstream. But that would not be supported by the Map/Reduce Processors.

thanks
— Hitesh


On Jun 16, 2014, at 10:15 AM, Jeff Hurt <Je...@clickfox.com> wrote:

> All,
> 
> I have a scenario where I have three Map-Reduce jobs, and I would like to build this as a Tez DAG.  The basic design is that the first Map-Reduce job (M1,R1) should invoke both the second Map-Reduce job (M2,R2) and the third Map (M3) job.  In addition, each job should write out its results to a file in HDFS.
> 
> Graphically, a layout would look like this:
> 
>          M1
>          |
>          R1    (R1 writes output to HDFS)
>          |
>      M2 --- M3  (M3 has no reducer, writes output to HDFS)
>      |
>      R2  (R2 writes output to HDFS)
> 
> The results of R1 would be written out to HDFS, and would also be used as the inputs to both M2 and M3.
> 
> But, we have not been able to get this functionality to work.  Errors show up whenever our DAG contains more than just the first Map-Reduce job.
> 
> Here is the pseudocode:
> 
>        final byte[] map1Payload = MRHelpers.createUserPayloadFromConf(map1Conf);
>        final byte[] map1InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
>            Text.class.getName());
> 
>        final Vertex map1Vertex = new Vertex("M1",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload), -1,
>            MRHelpers.getMapResource(map1Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));
> 
>        final Map<String, String> map1Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
>        map1Vertex.setTaskEnvironment(map1Env);
> 
>        final Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
>        MRHelpers.addMRInput(map1Vertex, map1InputPayload, initializerClazz);
> 
>        final byte[] reduce1Payload = MRHelpers.createUserPayloadFromConf(reduce1Conf);
>        final Vertex reduce1Vertex = new Vertex("R1",
>            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload), 1,
>            MRHelpers.getReduceResource(reduce1Conf));
>        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));
> 
>        final Map<String, String> reduce1Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env, false);
>        reduceVertex.setTaskEnvironment(reduce1Env);
> 
>        MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);
> 
>        final byte[] map2Payload = MRHelpers.createUserPayloadFromConf(map2Conf);
>        final byte[] map2InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
>            Text.class.getName());
> 
>        final Vertex map2Vertex = new Vertex("M2",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload), -1,
>            MRHelpers.getMapResource(map2Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));
> 
>        final Map<String, String> map2Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
>        map2Vertex.setTaskEnvironment(map2Env);
> 
>        final byte[] reduce2Payload = MRHelpers.createUserPayloadFromConf(reduce2Conf);
>        final Vertex reduce2Vertex = new Vertex("R2",
>            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload), 1,
>            MRHelpers.getReduceResource(reduce2Conf));
>        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));
> 
>        final Map<String, String> reduce2Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env, false);
>        reduceVertex.setTaskEnvironment(reduce2Env);
> 
>        MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);
> 
>        final byte[] map3Payload = MRHelpers.createUserPayloadFromConf(map3Conf);
>        final byte[] map3InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
>            Text.class.getName());
> 
>        final Vertex map3Vertex = new Vertex("M3",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload), -1,
>            MRHelpers.getMapResource(map3Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));
> 
>        final Map<String, String> map3Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
>        map2Vertex.setTaskEnvironment(map3Env);
>       MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);
> 
>       DAG dag = new DAG();
>        dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
> 
> 
> 
> (Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each reducer)
> 
> We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything works fine.  But when we add Map-Reduce 2 or Map 3; we start to get errors.
> 
> Is there a way to have multiple vertices write output to HDFS in the same DAG?  Are there code examples of doing this?
> 
> FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.
> 
> Thanks in advance,
> Jeff Hurt


Re: Tez DAG question

Posted by Vinod Kumar Vavilapalli <vi...@hortonworks.com>.
Can you post the exceptions/error logs?

+Vinod
Hortonworks Inc.
http://hortonworks.com/


On Mon, Jun 16, 2014 at 10:15 AM, Jeff Hurt <Je...@clickfox.com> wrote:

>  All,
>
> I have a scenario where I have three Map-Reduce jobs, and I would like to
> build this as a Tez DAG.  The basic design is that the first Map-Reduce job
> (M1,R1) should invoke both the second Map-Reduce job (M2,R2) and the third
> Map (M3) job.  In addition, each job should write out its results to a file
> in HDFS.
>
> Graphically, a layout would look like this:
>
>           M1
>           |
>           R1    (R1 writes output to HDFS)
>           |
>       M2 --- M3  (M3 has no reducer, writes output to HDFS)
>       |
>       R2  (R2 writes output to HDFS)
>
> The results of R1 would be written out to HDFS, and would also be used as
> the inputs to both M2 and M3.
>
> But, we have not been able to get this functionality to work.  Errors show
> up whenever our DAG contains more than just the first Map-Reduce job.
>
> Here is the pseudocode:
>
>         final byte[] map1Payload =
> MRHelpers.createUserPayloadFromConf(map1Conf);
>         final byte[] map1InputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
>             Text.class.getName());
>
>         final Vertex map1Vertex = new Vertex("M1",
>             new
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload),
> -1,
>             MRHelpers.getMapResource(map1Conf));
>         mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));
>
>         final Map<String, String> map1Env = new HashMap<String, String>();
>         MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
>         map1Vertex.setTaskEnvironment(map1Env);
>
>         final Class<? extends TezRootInputInitializer> initializerClazz =
> MRInputAMSplitGenerator.class;
>         MRHelpers.addMRInput(map1Vertex, map1InputPayload,
> initializerClazz);
>
>         final byte[] reduce1Payload =
> MRHelpers.createUserPayloadFromConf(reduce1Conf);
>         final Vertex reduce1Vertex = new Vertex("R1",
>             new
> ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload),
> 1,
>             MRHelpers.getReduceResource(reduce1Conf));
>
> reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));
>
>         final Map<String, String> reduce1Env = new HashMap<String,
> String>();
>         MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env,
> false);
>         reduceVertex.setTaskEnvironment(reduce1Env);
>
>         MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);
>
>         final byte[] map2Payload =
> MRHelpers.createUserPayloadFromConf(map2Conf);
>         final byte[] map2InputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
>             Text.class.getName());
>
>         final Vertex map2Vertex = new Vertex("M2",
>             new
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload),
> -1,
>             MRHelpers.getMapResource(map2Conf));
>         mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));
>
>         final Map<String, String> map2Env = new HashMap<String, String>();
>         MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
>         map2Vertex.setTaskEnvironment(map2Env);
>
>         final byte[] reduce2Payload =
> MRHelpers.createUserPayloadFromConf(reduce2Conf);
>         final Vertex reduce2Vertex = new Vertex("R2",
>             new
> ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload),
> 1,
>             MRHelpers.getReduceResource(reduce2Conf));
>
> reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));
>
>         final Map<String, String> reduce2Env = new HashMap<String,
> String>();
>         MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env,
> false);
>         reduceVertex.setTaskEnvironment(reduce2Env);
>
>         MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);
>
>         final byte[] map3Payload =
> MRHelpers.createUserPayloadFromConf(map3Conf);
>         final byte[] map3InputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
>             Text.class.getName());
>
>         final Vertex map3Vertex = new Vertex("M3",
>             new
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload),
> -1,
>             MRHelpers.getMapResource(map3Conf));
>         mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));
>
>         final Map<String, String> map3Env = new HashMap<String, String>();
>         MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
>         map2Vertex.setTaskEnvironment(map3Env);
>        MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);
>
>        DAG dag = new DAG();
>         dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>         dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>         dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>         dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>
>
>
> (Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each
> reducer)
>
> We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything
> works fine.  But when we add Map-Reduce 2 or Map 3; we start to get errors.
>
> Is there a way to have multiple vertices write output to HDFS in the same
> DAG?  Are there code examples of doing this?
>
> FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.
>
> Thanks in advance,
> Jeff Hurt
>

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.