You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Akshay Mendole <ak...@gmail.com> on 2018/11/28 06:34:20 UTC

Why beam pipeline ends up creating gigantic DAG for a simple word count program !!

Hi,
    We are in a process of evaluating different execution engines (mainly
apache spark and apache flink) for our production batch and streaming
pipelines. We thought of using apache beam as a unified programming model
framework to write the pipelines. When we executed simple wordcount
pipeline using both flink-runner and spark-runner, we saw that the DAG for
the pipeline in both flink and spark when executed using beam code had lot
of operators/nodes which cannot be explained. When we wrote the same
wordcount program using the APIs provided by the underlined execution
engine, the DAGs were way too simpler and could be easily explained.
Below is an example of wordcount program executed in spark.

This is the DAG when we executed this <https://pastebin.com/3MZZPgJk> code
developed using spark RDD APIs.
[image: Screen Shot 2018-11-28 at 11.39.35 AM.png]




This is the DAG when we executed this <https://pastebin.com/ABtUDmvC> code
developed using beam pipeline APIs.
[image: Screen Shot 2018-11-28 at 11.40.04 AM.png]
                           [image: Screen Shot 2018-11-28 at 11.40.11
AM.png]



We observed *same* *behaviour* when we executed the pipeline using flink
runner.
While this is simple word count, we observed when we wrote our complex
pipelines in beam and executed, they led to DAGs which were almost
impossible to explain :-( .

We have the following concerns regarding the same
1. Is the gigantic DAG expected?
2. If so, why so? And will it cause any performance impacts?
3. Since the DAG generated cannot be explained, are there better ways to
understand from developer point of view?

It would be great if someone helps us in this regard.

Thanks,
Akshay

Re: Why beam pipeline ends up creating gigantic DAG for a simple word count program !!

Posted by Robert Bradshaw <ro...@google.com>.
I'd also like to put the perspective out there that composite
transforms are like subroutines; their inner complexity should not
concern the end user and probably is the wrong thing to optimize for
(assuming there are not other costs, e.g. performance, and of course
we shouldn't have unnecessary complexity). This allows, operations
like Write (as an example) can be build out of the same kind of
simpler pipeline operations rather than having special classes
operators sinks (that still need all the complexity (write out
temporary shards, move successful ones to a consistent naming, remove
temporaries, etc.) but where the structure is less flexibly hard-coded
into the system).

Better representations of this hierarchical structure (vs. being
forced to look at the entire everything-unrolled-and-inlined view) is
what I think we should solve long-term. For word count it's annoying
(and surprising) that so much is going on, this becomes especially
important for larger pipelines with hundreds or thousands of stages
that may have high-level structure but become impenetrable in the
flattened, physical view.

At the very least, it'd be useful if these UIs had hooks where we
could map physical views into the logical views as understood by beam.
How best to represent these mappings is as yet an unsolved problem.

- Robert


On Fri, Nov 30, 2018 at 12:40 PM Maximilian Michels <mx...@apache.org> wrote:
>
> Hi Akshay,
>
> I think you're bringing up a very important point. Simplicity with
> minimal complexity is something that we strive for. In the case of the
> Write transform, the complexity was mainly added due to historical
> reasons which Kenneth mentioned.
>
> It is to note that some Runners don't even benefit from it because they
> don't support incremental recovery. I believe we will do work in the
> future to simplify the Write transform.
>
> If you look at other pipelines which don't use that transform you will
> find that they are much simpler.
>
> What can really help is to not expand the composite transforms, but
> transforms need to be expanded during translation and collapsing those
> transforms again after translation to Spark/Flink can be tricky.
>
> Generally speaking, we have realized this is an issue and have plans to
> fix it, e.g. https://issues.apache.org/jira/browse/BEAM-5859.
>
> Thanks,
> Max
>
> On 28.11.18 16:52, Kenneth Knowles wrote:
> > In fact, that logic in FileIO is "required" to have consistent output
> > even just for batch computation, because any step along the way may fail
> > and retry.
> >
> > I put "required" in quotes because there's a legacy concern at play here
> > - FileIO is written using the assumption that shuffles are
> > checkpointing. What is actually required is that the input to the last
> > stage will be the same, even if the whole pipeline goes down and has to
> > be brought back up. So the extra shuffle in those pipelines represents a
> > necessary checkpoint prior to running the last stage. In the
> > saveAsTextFile primitive (caveat: my understanding is vague and stale)
> > this would be done in a checkpoint finalization callback, and you have
> > to wait for that to complete before consuming the output if you want to
> > ensure correctness.
> >
> > Another relevant piece of information is that Beam has support for two
> > things that would make it easier to decipher the UI:
> >
> > 1. Nodes can have meaningful names. So that would make it obvious which
> > part is doing what.
> > 2. Transforms can be built as composites of other transforms, and this
> > is encouraged. In some UIs, notable Cloud Dataflow, the composites are
> > shown as a single box, so it is easier to understand.
> >
> > I would not expect every engine to adopt all of Beam's features like
> > these, but there might be a clever way to make the information available.
> >
> > Kenn
> >
> > On Wed, Nov 28, 2018 at 12:27 AM Tim Robertson
> > <timrobertson100@gmail.com <ma...@gmail.com>> wrote:
> >
> >     Hi Akshay
> >
> >     My understanding is that this all comes from the final FileIO
> >     write() stage.
> >
> >     When writing, the FileIO puts data into temporary files similar to
> >     the output formats for Hadoop MapReduce. Once ready to commit, it
> >     does something along the lines of a directory scan to determine
> >     which files need to be moved into the final output location. It is
> >     that directory scan stage that causes the complex DAG and it runs
> >     very quickly. While it looks complicated, I gather it is necessary
> >     to support the needs of batch/streaming and in particular the
> >     behaviour under failure scenarios.
> >
> >     I agree with you that from a developer perspective it is very
> >     difficult to understand. If you were to replace the FileIO write()
> >     with e.g. a push into a database (JdbcIO), or ElasticsearchIO or
> >     SolrIO etc you will see a much more familiar and simpler to
> >     understand DAG - it might be worth trying that to see. Over time I
> >     expect you will simply ignore that final job when looking at the DAG
> >     as you know it is just the output committer stage.
> >
> >     I don't know if you are using HDFS but if so, please be aware of
> >     BEAM-5036 [1] which is fixed in 2.9.0-SNAPSHOT and will be released
> >     with 2.9.0 in the coming days. It relates to what I outline above,
> >     where the files were actually copied into place rather than simply
> >     moved. On my jobs, I saw a very large increase in performance
> >     because of this and brought Beam much closer to native spark in
> >     terms of runtime performance.
> >
> >     I hope this helps,
> >     Tim
> >
> >
> >     [1] https://issues.apache.org/jira/browse/BEAM-5036
> >
> >     On Wed, Nov 28, 2018 at 7:34 AM Akshay Mendole
> >     <akshaymendole@gmail.com <ma...@gmail.com>> wrote:
> >
> >         Hi,
> >              We are in a process of evaluating different execution
> >         engines (mainly apache spark and apache flink) for our
> >         production batch and streaming pipelines. We thought of using
> >         apache beam as a unified programming model framework to write
> >         the pipelines. When we executed simple wordcount pipeline using
> >         both flink-runner and spark-runner, we saw that the DAG for the
> >         pipeline in both flink and spark when executed using beam code
> >         had lot of operators/nodes which cannot be explained. When we
> >         wrote the same wordcount program using the APIs provided by the
> >         underlined execution engine, the DAGs were way too simpler and
> >         could be easily explained.
> >         Below is an example of wordcount program executed in spark.
> >
> >         This is the DAG when we executed this
> >         <https://pastebin.com/3MZZPgJk> code developed using spark RDD APIs.
> >         Screen Shot 2018-11-28 at 11.39.35 AM.png
> >
> >
> >
> >
> >         This is the DAG when we executed this
> >         <https://pastebin.com/ABtUDmvC> code developed using beam
> >         pipeline APIs.
> >         Screen Shot 2018-11-28 at 11.40.04 AM.png Screen Shot 2018-11-28
> >         at 11.40.11 AM.png
> >
> >
> >
> >         We observed *same* *behaviour* when we executed the pipeline
> >         using flink runner.
> >         While this is simple word count, we observed when we wrote our
> >         complex pipelines in beam and executed, they led to DAGs which
> >         were almost impossible to explain :-( .
> >
> >         We have the following concerns regarding the same
> >         1. Is the gigantic DAG expected?
> >         2. If so, why so? And will it cause any performance impacts?
> >         3. Since the DAG generated cannot be explained, are there better
> >         ways to understand from developer point of view?
> >
> >         It would be great if someone helps us in this regard.
> >
> >         Thanks,
> >         Akshay
> >
> >
> >
> >

Re: Why beam pipeline ends up creating gigantic DAG for a simple word count program !!

Posted by Maximilian Michels <mx...@apache.org>.
Hi Akshay,

I think you're bringing up a very important point. Simplicity with 
minimal complexity is something that we strive for. In the case of the 
Write transform, the complexity was mainly added due to historical 
reasons which Kenneth mentioned.

It is to note that some Runners don't even benefit from it because they 
don't support incremental recovery. I believe we will do work in the 
future to simplify the Write transform.

If you look at other pipelines which don't use that transform you will 
find that they are much simpler.

What can really help is to not expand the composite transforms, but 
transforms need to be expanded during translation and collapsing those 
transforms again after translation to Spark/Flink can be tricky.

Generally speaking, we have realized this is an issue and have plans to 
fix it, e.g. https://issues.apache.org/jira/browse/BEAM-5859.

Thanks,
Max

On 28.11.18 16:52, Kenneth Knowles wrote:
> In fact, that logic in FileIO is "required" to have consistent output 
> even just for batch computation, because any step along the way may fail 
> and retry.
> 
> I put "required" in quotes because there's a legacy concern at play here 
> - FileIO is written using the assumption that shuffles are 
> checkpointing. What is actually required is that the input to the last 
> stage will be the same, even if the whole pipeline goes down and has to 
> be brought back up. So the extra shuffle in those pipelines represents a 
> necessary checkpoint prior to running the last stage. In the 
> saveAsTextFile primitive (caveat: my understanding is vague and stale) 
> this would be done in a checkpoint finalization callback, and you have 
> to wait for that to complete before consuming the output if you want to 
> ensure correctness.
> 
> Another relevant piece of information is that Beam has support for two 
> things that would make it easier to decipher the UI:
> 
> 1. Nodes can have meaningful names. So that would make it obvious which 
> part is doing what.
> 2. Transforms can be built as composites of other transforms, and this 
> is encouraged. In some UIs, notable Cloud Dataflow, the composites are 
> shown as a single box, so it is easier to understand.
> 
> I would not expect every engine to adopt all of Beam's features like 
> these, but there might be a clever way to make the information available.
> 
> Kenn
> 
> On Wed, Nov 28, 2018 at 12:27 AM Tim Robertson 
> <timrobertson100@gmail.com <ma...@gmail.com>> wrote:
> 
>     Hi Akshay
> 
>     My understanding is that this all comes from the final FileIO
>     write() stage.
> 
>     When writing, the FileIO puts data into temporary files similar to
>     the output formats for Hadoop MapReduce. Once ready to commit, it
>     does something along the lines of a directory scan to determine
>     which files need to be moved into the final output location. It is
>     that directory scan stage that causes the complex DAG and it runs
>     very quickly. While it looks complicated, I gather it is necessary
>     to support the needs of batch/streaming and in particular the
>     behaviour under failure scenarios.
> 
>     I agree with you that from a developer perspective it is very
>     difficult to understand. If you were to replace the FileIO write()
>     with e.g. a push into a database (JdbcIO), or ElasticsearchIO or
>     SolrIO etc you will see a much more familiar and simpler to
>     understand DAG - it might be worth trying that to see. Over time I
>     expect you will simply ignore that final job when looking at the DAG
>     as you know it is just the output committer stage.
> 
>     I don't know if you are using HDFS but if so, please be aware of
>     BEAM-5036 [1] which is fixed in 2.9.0-SNAPSHOT and will be released
>     with 2.9.0 in the coming days. It relates to what I outline above,
>     where the files were actually copied into place rather than simply
>     moved. On my jobs, I saw a very large increase in performance
>     because of this and brought Beam much closer to native spark in
>     terms of runtime performance.
> 
>     I hope this helps,
>     Tim
> 
> 
>     [1] https://issues.apache.org/jira/browse/BEAM-5036
> 
>     On Wed, Nov 28, 2018 at 7:34 AM Akshay Mendole
>     <akshaymendole@gmail.com <ma...@gmail.com>> wrote:
> 
>         Hi,
>              We are in a process of evaluating different execution
>         engines (mainly apache spark and apache flink) for our
>         production batch and streaming pipelines. We thought of using
>         apache beam as a unified programming model framework to write
>         the pipelines. When we executed simple wordcount pipeline using
>         both flink-runner and spark-runner, we saw that the DAG for the
>         pipeline in both flink and spark when executed using beam code
>         had lot of operators/nodes which cannot be explained. When we
>         wrote the same wordcount program using the APIs provided by the
>         underlined execution engine, the DAGs were way too simpler and
>         could be easily explained.
>         Below is an example of wordcount program executed in spark.
> 
>         This is the DAG when we executed this
>         <https://pastebin.com/3MZZPgJk> code developed using spark RDD APIs.
>         Screen Shot 2018-11-28 at 11.39.35 AM.png
> 
> 
> 
> 
>         This is the DAG when we executed this
>         <https://pastebin.com/ABtUDmvC> code developed using beam
>         pipeline APIs.
>         Screen Shot 2018-11-28 at 11.40.04 AM.png Screen Shot 2018-11-28
>         at 11.40.11 AM.png
> 
> 
> 
>         We observed *same* *behaviour* when we executed the pipeline
>         using flink runner.
>         While this is simple word count, we observed when we wrote our
>         complex pipelines in beam and executed, they led to DAGs which
>         were almost impossible to explain :-( .
> 
>         We have the following concerns regarding the same
>         1. Is the gigantic DAG expected?
>         2. If so, why so? And will it cause any performance impacts?
>         3. Since the DAG generated cannot be explained, are there better
>         ways to understand from developer point of view?
> 
>         It would be great if someone helps us in this regard.
> 
>         Thanks,
>         Akshay
> 
> 
> 
> 

Re: Why beam pipeline ends up creating gigantic DAG for a simple word count program !!

Posted by Kenneth Knowles <ke...@apache.org>.
In fact, that logic in FileIO is "required" to have consistent output even
just for batch computation, because any step along the way may fail and
retry.

I put "required" in quotes because there's a legacy concern at play here -
FileIO is written using the assumption that shuffles are checkpointing.
What is actually required is that the input to the last stage will be the
same, even if the whole pipeline goes down and has to be brought back up.
So the extra shuffle in those pipelines represents a necessary checkpoint
prior to running the last stage. In the saveAsTextFile primitive (caveat:
my understanding is vague and stale) this would be done in a checkpoint
finalization callback, and you have to wait for that to complete before
consuming the output if you want to ensure correctness.

Another relevant piece of information is that Beam has support for two
things that would make it easier to decipher the UI:

1. Nodes can have meaningful names. So that would make it obvious which
part is doing what.
2. Transforms can be built as composites of other transforms, and this is
encouraged. In some UIs, notable Cloud Dataflow, the composites are shown
as a single box, so it is easier to understand.

I would not expect every engine to adopt all of Beam's features like these,
but there might be a clever way to make the information available.

Kenn

On Wed, Nov 28, 2018 at 12:27 AM Tim Robertson <ti...@gmail.com>
wrote:

> Hi Akshay
>
> My understanding is that this all comes from the final FileIO write()
> stage.
>
> When writing, the FileIO puts data into temporary files similar to the
> output formats for Hadoop MapReduce. Once ready to commit, it does
> something along the lines of a directory scan to determine which files need
> to be moved into the final output location. It is that directory scan stage
> that causes the complex DAG and it runs very quickly. While it looks
> complicated, I gather it is necessary to support the needs of
> batch/streaming and in particular the behaviour under failure scenarios.
>
> I agree with you that from a developer perspective it is very difficult to
> understand. If you were to replace the FileIO write() with e.g. a push into
> a database (JdbcIO), or ElasticsearchIO or SolrIO etc you will see a much
> more familiar and simpler to understand DAG - it might be worth trying that
> to see. Over time I expect you will simply ignore that final job when
> looking at the DAG as you know it is just the output committer stage.
>
> I don't know if you are using HDFS but if so, please be aware of BEAM-5036
> [1] which is fixed in 2.9.0-SNAPSHOT and will be released with 2.9.0 in the
> coming days. It relates to what I outline above, where the files were
> actually copied into place rather than simply moved. On my jobs, I saw a
> very large increase in performance because of this and brought Beam much
> closer to native spark in terms of runtime performance.
>
> I hope this helps,
> Tim
>
>
> [1] https://issues.apache.org/jira/browse/BEAM-5036
>
> On Wed, Nov 28, 2018 at 7:34 AM Akshay Mendole <ak...@gmail.com>
> wrote:
>
>> Hi,
>>     We are in a process of evaluating different execution engines (mainly
>> apache spark and apache flink) for our production batch and streaming
>> pipelines. We thought of using apache beam as a unified programming model
>> framework to write the pipelines. When we executed simple wordcount
>> pipeline using both flink-runner and spark-runner, we saw that the DAG for
>> the pipeline in both flink and spark when executed using beam code had lot
>> of operators/nodes which cannot be explained. When we wrote the same
>> wordcount program using the APIs provided by the underlined execution
>> engine, the DAGs were way too simpler and could be easily explained.
>> Below is an example of wordcount program executed in spark.
>>
>> This is the DAG when we executed this <https://pastebin.com/3MZZPgJk>
>> code developed using spark RDD APIs.
>> [image: Screen Shot 2018-11-28 at 11.39.35 AM.png]
>>
>>
>>
>>
>> This is the DAG when we executed this <https://pastebin.com/ABtUDmvC> code
>> developed using beam pipeline APIs.
>> [image: Screen Shot 2018-11-28 at 11.40.04 AM.png]
>>                                [image: Screen Shot 2018-11-28 at
>> 11.40.11 AM.png]
>>
>>
>>
>> We observed *same* *behaviour* when we executed the pipeline using flink
>> runner.
>> While this is simple word count, we observed when we wrote our complex
>> pipelines in beam and executed, they led to DAGs which were almost
>> impossible to explain :-( .
>>
>> We have the following concerns regarding the same
>> 1. Is the gigantic DAG expected?
>> 2. If so, why so? And will it cause any performance impacts?
>> 3. Since the DAG generated cannot be explained, are there better ways to
>> understand from developer point of view?
>>
>> It would be great if someone helps us in this regard.
>>
>> Thanks,
>> Akshay
>>
>>
>>
>>
>>

Re: Why beam pipeline ends up creating gigantic DAG for a simple word count program !!

Posted by Tim Robertson <ti...@gmail.com>.
Hi Akshay

My understanding is that this all comes from the final FileIO write() stage.

When writing, the FileIO puts data into temporary files similar to the
output formats for Hadoop MapReduce. Once ready to commit, it does
something along the lines of a directory scan to determine which files need
to be moved into the final output location. It is that directory scan stage
that causes the complex DAG and it runs very quickly. While it looks
complicated, I gather it is necessary to support the needs of
batch/streaming and in particular the behaviour under failure scenarios.

I agree with you that from a developer perspective it is very difficult to
understand. If you were to replace the FileIO write() with e.g. a push into
a database (JdbcIO), or ElasticsearchIO or SolrIO etc you will see a much
more familiar and simpler to understand DAG - it might be worth trying that
to see. Over time I expect you will simply ignore that final job when
looking at the DAG as you know it is just the output committer stage.

I don't know if you are using HDFS but if so, please be aware of BEAM-5036
[1] which is fixed in 2.9.0-SNAPSHOT and will be released with 2.9.0 in the
coming days. It relates to what I outline above, where the files were
actually copied into place rather than simply moved. On my jobs, I saw a
very large increase in performance because of this and brought Beam much
closer to native spark in terms of runtime performance.

I hope this helps,
Tim


[1] https://issues.apache.org/jira/browse/BEAM-5036

On Wed, Nov 28, 2018 at 7:34 AM Akshay Mendole <ak...@gmail.com>
wrote:

> Hi,
>     We are in a process of evaluating different execution engines (mainly
> apache spark and apache flink) for our production batch and streaming
> pipelines. We thought of using apache beam as a unified programming model
> framework to write the pipelines. When we executed simple wordcount
> pipeline using both flink-runner and spark-runner, we saw that the DAG for
> the pipeline in both flink and spark when executed using beam code had lot
> of operators/nodes which cannot be explained. When we wrote the same
> wordcount program using the APIs provided by the underlined execution
> engine, the DAGs were way too simpler and could be easily explained.
> Below is an example of wordcount program executed in spark.
>
> This is the DAG when we executed this <https://pastebin.com/3MZZPgJk>
> code developed using spark RDD APIs.
> [image: Screen Shot 2018-11-28 at 11.39.35 AM.png]
>
>
>
>
> This is the DAG when we executed this <https://pastebin.com/ABtUDmvC> code
> developed using beam pipeline APIs.
> [image: Screen Shot 2018-11-28 at 11.40.04 AM.png]
>                              [image: Screen Shot 2018-11-28 at 11.40.11
> AM.png]
>
>
>
> We observed *same* *behaviour* when we executed the pipeline using flink
> runner.
> While this is simple word count, we observed when we wrote our complex
> pipelines in beam and executed, they led to DAGs which were almost
> impossible to explain :-( .
>
> We have the following concerns regarding the same
> 1. Is the gigantic DAG expected?
> 2. If so, why so? And will it cause any performance impacts?
> 3. Since the DAG generated cannot be explained, are there better ways to
> understand from developer point of view?
>
> It would be great if someone helps us in this regard.
>
> Thanks,
> Akshay
>
>
>
>
>