You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Tim van Heugten <st...@gmail.com> on 2013/01/30 10:33:49 UTC

MemPipeline and context

Hi,

Since april I'm using Crunch for a project. We're not doing only linear
executions of the pipeline, so we're sometimes having issues with how
Crunch is optimizing our execution graph. We need to add materializations
here and there as hints to what parts of the graph can be shared for
outputs and so on.

Recently we decided to see if 0.4.0-incubating would provide us any
improvements (I'm afraid not yet). Trying to adapt our code to the new API,
however, exposed some difficulties and issues. A few bugs have been
reported regarding those issue (CRUNCH-152 to CRUNCH-155), thank you for
picking them up.

The difficulties arise from the newly introduced tight bound with
TaskInputOutputContext. Now in our jUnit tests we need to inject this
before the tests can run (many of our DoFns adjust counters of perform
progress() calls). So far so good, I can use
CrunchTestSupport.getTestContext(config) with a mocked config and call
setContext() on the DoFn. But there is some unclarity:
*Should I call initialize() after setContext()?
*I can see initialize() is called in setContext(), but this doesn't seem
documented or guarenteed. Should setContext() be made final so it can be
documented that initialize does not need to be called after?

In our more elaborate tests we use MemPipeline to see the combined effect
of our DoFns. But there:
*MemCollection shows ambiguous behaviour wrt initialize/setContext.
*A parallelDo with a PCollection output makes a call to
*just*initialize(), and a parallelDo with a PTable output makes a call
to
*both* initialize() and setContext(). Currently this fails some of our
tests because we use counters and progress().

Finally, I've had to create our own implementation of MemCollection
altogether*, because the stubbed TaskInputOutputContext is too limited for
our tests.
*Stubbed TaskInputOutputContext in MemCollection is unable to handle
Counters*.
I'm aware that so much is stated in the javadoc, but I can't choose
*not*to use counters when testing the business code. Because Counters
were
handled (and even counted) in 0.3.0 I'm feeling confident enough about this
to raise the issue.

I'm very happy with the api of crunch and would love for this project to
become more reliable and widely adopted. If there is anything I can do (or
instruction on where to begin understanding the planning component) please
let me know.

Cheers,

Tim van Heugten

* Because I use MemPipeline in test contexts only I rely on the mocked
instance from CrunchTestSupport.getTestContext(conf);, further, replacing
MemCollection implies replacing MemTable and MemGroupedTable as well.

Re: MemPipeline and context

Posted by Josh Wills <jw...@cloudera.com>.
I created https://issues.apache.org/jira/browse/CRUNCH-157 and
https://issues.apache.org/jira/browse/CRUNCH-158 to track these.


On Wed, Jan 30, 2013 at 6:07 AM, Josh Wills <jw...@cloudera.com> wrote:

> Hey Tim,
>
> Replies inlined.
>
>
> On Wed, Jan 30, 2013 at 1:33 AM, Tim van Heugten <st...@gmail.com> wrote:
>
>> Hi,
>>
>> Since april I'm using Crunch for a project. We're not doing only linear
>> executions of the pipeline, so we're sometimes having issues with how
>> Crunch is optimizing our execution graph. We need to add materializations
>> here and there as hints to what parts of the graph can be shared for
>> outputs and so on.
>>
>> Recently we decided to see if 0.4.0-incubating would provide us any
>> improvements (I'm afraid not yet). Trying to adapt our code to the new API,
>> however, exposed some difficulties and issues. A few bugs have been
>> reported regarding those issue (CRUNCH-152 to CRUNCH-155), thank you for
>> picking them up.
>>
>> The difficulties arise from the newly introduced tight bound with
>> TaskInputOutputContext. Now in our jUnit tests we need to inject this
>> before the tests can run (many of our DoFns adjust counters of perform
>> progress() calls). So far so good, I can use
>> CrunchTestSupport.getTestContext(config) with a mocked config and call
>> setContext() on the DoFn. But there is some unclarity:
>> *Should I call initialize() after setContext()?
>> *I can see initialize() is called in setContext(), but this doesn't seem
>> documented or guarenteed. Should setContext() be made final so it can be
>> documented that initialize does not need to be called after?
>>
>
> Yes, I think so. I'm not sure of the implications of doing that, but I'll
> create a branch and see what fails and what needs fixing.
>
>
>>
>> In our more elaborate tests we use MemPipeline to see the combined effect
>> of our DoFns. But there:
>> *MemCollection shows ambiguous behaviour wrt initialize/setContext.
>> *A parallelDo with a PCollection output makes a call to *just*initialize(), and a parallelDo with a PTable output makes a call to
>> *both* initialize() and setContext(). Currently this fails some of our
>> tests because we use counters and progress().
>>
>
> Yeah, that's no good. Let's open a JIRA for that one.
>
>
>>
>> Finally, I've had to create our own implementation of MemCollection
>> altogether*, because the stubbed TaskInputOutputContext is too limited for
>> our tests.
>> *Stubbed TaskInputOutputContext in MemCollection is unable to handle
>> Counters*.
>> I'm aware that so much is stated in the javadoc, but I can't choose *not*to use counters when testing the business code. Because Counters were
>> handled (and even counted) in 0.3.0 I'm feeling confident enough about this
>> to raise the issue.
>>
>
> Agreed-- I didn't realize MemCollection gave up the ability to use
> Counters in that change-- I'm a bit surprised by that. Let's create a JIRA
> to put it back in.
>
>
>> I'm very happy with the api of crunch and would love for this project to
>> become more reliable and widely adopted. If there is anything I can do (or
>> instruction on where to begin understanding the planning component) please
>> let me know.
>>
>> Cheers,
>>
>> Tim van Heugten
>>
>> * Because I use MemPipeline in test contexts only I rely on the mocked
>> instance from CrunchTestSupport.getTestContext(conf);, further, replacing
>> MemCollection implies replacing MemTable and MemGroupedTable as well.
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: MemPipeline and context

Posted by Josh Wills <jw...@cloudera.com>.
Hey Tim,

Replies inlined.


On Wed, Jan 30, 2013 at 1:33 AM, Tim van Heugten <st...@gmail.com> wrote:

> Hi,
>
> Since april I'm using Crunch for a project. We're not doing only linear
> executions of the pipeline, so we're sometimes having issues with how
> Crunch is optimizing our execution graph. We need to add materializations
> here and there as hints to what parts of the graph can be shared for
> outputs and so on.
>
> Recently we decided to see if 0.4.0-incubating would provide us any
> improvements (I'm afraid not yet). Trying to adapt our code to the new API,
> however, exposed some difficulties and issues. A few bugs have been
> reported regarding those issue (CRUNCH-152 to CRUNCH-155), thank you for
> picking them up.
>
> The difficulties arise from the newly introduced tight bound with
> TaskInputOutputContext. Now in our jUnit tests we need to inject this
> before the tests can run (many of our DoFns adjust counters of perform
> progress() calls). So far so good, I can use
> CrunchTestSupport.getTestContext(config) with a mocked config and call
> setContext() on the DoFn. But there is some unclarity:
> *Should I call initialize() after setContext()?
> *I can see initialize() is called in setContext(), but this doesn't seem
> documented or guarenteed. Should setContext() be made final so it can be
> documented that initialize does not need to be called after?
>

Yes, I think so. I'm not sure of the implications of doing that, but I'll
create a branch and see what fails and what needs fixing.


>
> In our more elaborate tests we use MemPipeline to see the combined effect
> of our DoFns. But there:
> *MemCollection shows ambiguous behaviour wrt initialize/setContext.
> *A parallelDo with a PCollection output makes a call to *just*initialize(), and a parallelDo with a PTable output makes a call to
> *both* initialize() and setContext(). Currently this fails some of our
> tests because we use counters and progress().
>

Yeah, that's no good. Let's open a JIRA for that one.


>
> Finally, I've had to create our own implementation of MemCollection
> altogether*, because the stubbed TaskInputOutputContext is too limited for
> our tests.
> *Stubbed TaskInputOutputContext in MemCollection is unable to handle
> Counters*.
> I'm aware that so much is stated in the javadoc, but I can't choose *not*to use counters when testing the business code. Because Counters were
> handled (and even counted) in 0.3.0 I'm feeling confident enough about this
> to raise the issue.
>

Agreed-- I didn't realize MemCollection gave up the ability to use Counters
in that change-- I'm a bit surprised by that. Let's create a JIRA to put it
back in.


> I'm very happy with the api of crunch and would love for this project to
> become more reliable and widely adopted. If there is anything I can do (or
> instruction on where to begin understanding the planning component) please
> let me know.
>
> Cheers,
>
> Tim van Heugten
>
> * Because I use MemPipeline in test contexts only I rely on the mocked
> instance from CrunchTestSupport.getTestContext(conf);, further, replacing
> MemCollection implies replacing MemTable and MemGroupedTable as well.
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: MemPipeline and context

Posted by Tim van Heugten <st...@gmail.com>.
Usually just to alter the default execution plan.

In this case where the crunch bug counteracted our own bug we used it to
end up with the desired (*) output (in fact triggering the crunch bug). We
now fixed our bug and are no longer pursuing the crunch bug.

In general I would not expect the output to be dependent on the execution
plan.


Cheers,

Tim

*) Here we had a discrepancy between desired and (technically) correct
output.

On Wed, Feb 6, 2013 at 2:07 PM, Gabriel Reid <ga...@gmail.com> wrote:

> Thanks for all the info Tim. I've posted a bit more information on
> CRUNCH-163, and will look into it more this evening.
>
> About calling materialize within pipelines, just to clarify: are you doing
> this both to get a more efficient execution (i.e. alter the default
> execution plan) as well as to get the correct output, or just one of those
> two?
>
> Thanks,
>
> Gabriel
>
>
> On Wed, Feb 6, 2013 at 11:53 AM, Tim van Heugten <st...@gmail.com> wrote:
>
>> To summarize:
>> - When we saw data duplication, that was what we should have been
>> expecting, given our implementation. That is not the issue.
>> - Sometimes we didn't see data duplication. That is an issue:
>>    *Union sometimes ignores one of the input branches.*
>>
>> I created https://issues.apache.org/jira/browse/CRUNCH-163 for this
>> issue. The tests singleUnion and doubleUnionWithoutMaterializeInbetween
>> pass in my environment (0.4), the others fail.
>> Besides breaking a union by adding a materialize after it I could also
>> break it by performing a parallelDo after it or by just joining two read
>> pCollections.
>>
>>
>> Cheers,
>>
>> Tim
>>
>>
>>
>>  On Tue, Feb 5, 2013 at 3:38 PM, Tim van Heugten <st...@gmail.com>wrote:
>>
>>> Hmmm,
>>>
>>> So we had a mistake in our code that emitted the data in both branches
>>> before union2.
>>> *And*, the crunch union also *failed to merge the data* in some
>>> circumstance. My side-remark about not seeing the join happen was actually
>>> bang on.. :-/
>>>
>>> So the question now becomes, when does a union ignore one of its
>>> incoming branches?
>>> Apparently with materialization in the right spots we can force the
>>> correct pipeline(*).
>>>
>>> Cheers,
>>>
>>> Tim van Heugten
>>>
>>>
>>> *) Thereby exposing our bug, seemingly data duplication. But just to be
>>> clear, this is actually the *correct* behavior.
>>>
>>>
>>>
>>> On Tue, Feb 5, 2013 at 3:18 PM, Tim van Heugten <st...@gmail.com>wrote:
>>>
>>>> Hi,
>>>>
>>>> It turns out the data in the two branches that are unioned in union2 is
>>>> not mutually exclusive (counter to what I was expecting). Probably we
>>>> should expect data duplication.
>>>>
>>>> However, this does still not explain why sometimes we find data
>>>> duplication and sometimes we don't.
>>>>
>>>> Will keep you posted,
>>>>
>>>> Tim
>>>>
>>>>
>>>> On Tue, Feb 5, 2013 at 11:32 AM, Tim van Heugten <st...@gmail.com>wrote:
>>>>
>>>>> Hi Gabriel,
>>>>>
>>>>> I've been unsuccessful so far to reproduce the issue in a controlled
>>>>> environment. As said, its fragile, maybe the types involved play a role, so
>>>>> when I tried to simplify those I broke the failure condition.
>>>>> I decide it's time to try providing more information without giving an
>>>>> explicit example.
>>>>>
>>>>> The pipeline we build is illustrated here: http://yuml.me/8ef99512.
>>>>> Depending on where we materialize the data occurs twice in UP.
>>>>> The EITPI job filters the exact opposite of the filter branch. In PWR
>>>>> only data from EITPI is passed through, while the PITP data is used to
>>>>> modify it.
>>>>> Below you find the job names as executed when dataduplication occurs,
>>>>> materializations occur before BTO(*) and after UP.
>>>>>
>>>>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch655004156/p4)"
>>>>>
>>>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch655004156/p2)"
>>>>>
>>>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch655004156/p8)"
>>>>>
>>>>> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch655004156/p8)]]+GBK+UP+Avro(/tmp/crunch655004156/p6)"
>>>>> "Avro(/tmp/crunch655004156/p6)+GetData+Avro(/tmp/crunch655004156/p10)"
>>>>>
>>>>> "Avro(/tmp/crunch655004156/p6)+GetTraces+Avro(target/trace-dump/traces)"
>>>>>
>>>>> Here are the jobs performed when materialization is added between BTO
>>>>> and gbk:
>>>>>
>>>>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch-551174870/p4)"
>>>>>
>>>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch-551174870/p2)"
>>>>>
>>>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch-551174870/p6)"
>>>>> "Avro(/tmp/crunch-551174870/p6)+GBK+UP+Avro(/tmp/crunch-551174870/p8)"
>>>>>
>>>>> "Avro(/tmp/crunch-551174870/p8)+GetData+Avro(/tmp/crunch-551174870/p10)"
>>>>>
>>>>> "Avro(/tmp/crunch-551174870/p8)+GetTraces+Avro(target/trace-dump/traces)"
>>>>>
>>>>> Without changing changing anything else, the added materialization
>>>>> fixes the issue of data duplication.
>>>>>
>>>>> If you have any clues how I can extract a clean working example I'm
>>>>> happy to hear.
>>>>>
>>>>>
>>>>> *) This materialization probably explains the second job, however,
>>>>> where the filtered data is joined is lost on me. This is not the cause
>>>>> though, with just one materialize at the end, after UP, the data count
>>>>> still doubled. The jobs then look like this:
>>>>>
>>>>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch369510677/p4)"
>>>>>
>>>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch369510677/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch369510677/p6)"
>>>>>
>>>>> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch369510677/p6)]]+GBK+UP+Avro(/tmp/crunch369510677/p2)"
>>>>>
>>>>> "Avro(/tmp/crunch369510677/p2)+GetTraces+Avro(target/trace-dump/traces)"
>>>>> "Avro(/tmp/crunch369510677/p2)+GetData+Avro(/tmp/crunch369510677/p8)"
>>>>>
>>>>> BR,
>>>>>
>>>>> Tim van Heugten
>>>>>
>>>>>
>>>>> On Thu, Jan 31, 2013 at 9:27 PM, Gabriel Reid <ga...@gmail.com>wrote:
>>>>>
>>>>>> Hi Tim,
>>>>>>
>>>>>> On 31 Jan 2013, at 10:45, Tim van Heugten <st...@gmail.com> wrote:
>>>>>>
>>>>>> > Hi Gabriel,
>>>>>> >
>>>>>> > For the most part it is similar to what was send around recently on
>>>>>> this mailinglist, see:
>>>>>> > From  Dave Beech <d....@paraliatech.com>
>>>>>> > Subject       Question about mapreduce job planner
>>>>>> > Date  Tue, 15 Jan 2013 11:41:42 GMT
>>>>>> >
>>>>>> > So, the common path before multiple outputs branch is executed
>>>>>> twice. Sometimes the issues seem related to unions though, i.e. multiple
>>>>>> inputs. We seem to have been troubled by a grouped table parallelDo on a
>>>>>> table-union-gbk that got its data twice (all grouped doubled in size).
>>>>>> Inserting a materialize between the union and groupByKey solved the issue.
>>>>>> >
>>>>>> > These issues seem very fragile (so they're fixed easily by changing
>>>>>> something that's irrelevant to the output), so usually we just add or
>>>>>> remove a materialization to make it run again.
>>>>>> > I'll see if I can cleanly reproduce the data duplication issue
>>>>>> later this week.
>>>>>>
>>>>>> Ok, that would be great if you could replicate it in a small test,
>>>>>> thanks!
>>>>>>
>>>>>> - Gabriel
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: MemPipeline and context

Posted by Gabriel Reid <ga...@gmail.com>.
Thanks for all the info Tim. I've posted a bit more information on
CRUNCH-163, and will look into it more this evening.

About calling materialize within pipelines, just to clarify: are you doing
this both to get a more efficient execution (i.e. alter the default
execution plan) as well as to get the correct output, or just one of those
two?

Thanks,

Gabriel


On Wed, Feb 6, 2013 at 11:53 AM, Tim van Heugten <st...@gmail.com> wrote:

> To summarize:
> - When we saw data duplication, that was what we should have been
> expecting, given our implementation. That is not the issue.
> - Sometimes we didn't see data duplication. That is an issue:
>    *Union sometimes ignores one of the input branches.*
>
> I created https://issues.apache.org/jira/browse/CRUNCH-163 for this
> issue. The tests singleUnion and doubleUnionWithoutMaterializeInbetween
> pass in my environment (0.4), the others fail.
> Besides breaking a union by adding a materialize after it I could also
> break it by performing a parallelDo after it or by just joining two read
> pCollections.
>
>
> Cheers,
>
> Tim
>
>
>
>  On Tue, Feb 5, 2013 at 3:38 PM, Tim van Heugten <st...@gmail.com> wrote:
>
>> Hmmm,
>>
>> So we had a mistake in our code that emitted the data in both branches
>> before union2.
>> *And*, the crunch union also *failed to merge the data* in some
>> circumstance. My side-remark about not seeing the join happen was actually
>> bang on.. :-/
>>
>> So the question now becomes, when does a union ignore one of its incoming
>> branches?
>> Apparently with materialization in the right spots we can force the
>> correct pipeline(*).
>>
>> Cheers,
>>
>> Tim van Heugten
>>
>>
>> *) Thereby exposing our bug, seemingly data duplication. But just to be
>> clear, this is actually the *correct* behavior.
>>
>>
>>
>> On Tue, Feb 5, 2013 at 3:18 PM, Tim van Heugten <st...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> It turns out the data in the two branches that are unioned in union2 is
>>> not mutually exclusive (counter to what I was expecting). Probably we
>>> should expect data duplication.
>>>
>>> However, this does still not explain why sometimes we find data
>>> duplication and sometimes we don't.
>>>
>>> Will keep you posted,
>>>
>>> Tim
>>>
>>>
>>> On Tue, Feb 5, 2013 at 11:32 AM, Tim van Heugten <st...@gmail.com>wrote:
>>>
>>>> Hi Gabriel,
>>>>
>>>> I've been unsuccessful so far to reproduce the issue in a controlled
>>>> environment. As said, its fragile, maybe the types involved play a role, so
>>>> when I tried to simplify those I broke the failure condition.
>>>> I decide it's time to try providing more information without giving an
>>>> explicit example.
>>>>
>>>> The pipeline we build is illustrated here: http://yuml.me/8ef99512.
>>>> Depending on where we materialize the data occurs twice in UP.
>>>> The EITPI job filters the exact opposite of the filter branch. In PWR
>>>> only data from EITPI is passed through, while the PITP data is used to
>>>> modify it.
>>>> Below you find the job names as executed when dataduplication occurs,
>>>> materializations occur before BTO(*) and after UP.
>>>>
>>>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch655004156/p4)"
>>>>
>>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch655004156/p2)"
>>>>
>>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch655004156/p8)"
>>>>
>>>> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch655004156/p8)]]+GBK+UP+Avro(/tmp/crunch655004156/p6)"
>>>> "Avro(/tmp/crunch655004156/p6)+GetData+Avro(/tmp/crunch655004156/p10)"
>>>> "Avro(/tmp/crunch655004156/p6)+GetTraces+Avro(target/trace-dump/traces)"
>>>>
>>>> Here are the jobs performed when materialization is added between BTO
>>>> and gbk:
>>>>
>>>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch-551174870/p4)"
>>>>
>>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch-551174870/p2)"
>>>>
>>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch-551174870/p6)"
>>>> "Avro(/tmp/crunch-551174870/p6)+GBK+UP+Avro(/tmp/crunch-551174870/p8)"
>>>> "Avro(/tmp/crunch-551174870/p8)+GetData+Avro(/tmp/crunch-551174870/p10)"
>>>>
>>>> "Avro(/tmp/crunch-551174870/p8)+GetTraces+Avro(target/trace-dump/traces)"
>>>>
>>>> Without changing changing anything else, the added materialization
>>>> fixes the issue of data duplication.
>>>>
>>>> If you have any clues how I can extract a clean working example I'm
>>>> happy to hear.
>>>>
>>>>
>>>> *) This materialization probably explains the second job, however,
>>>> where the filtered data is joined is lost on me. This is not the cause
>>>> though, with just one materialize at the end, after UP, the data count
>>>> still doubled. The jobs then look like this:
>>>>
>>>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch369510677/p4)"
>>>>
>>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch369510677/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch369510677/p6)"
>>>>
>>>> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch369510677/p6)]]+GBK+UP+Avro(/tmp/crunch369510677/p2)"
>>>> "Avro(/tmp/crunch369510677/p2)+GetTraces+Avro(target/trace-dump/traces)"
>>>> "Avro(/tmp/crunch369510677/p2)+GetData+Avro(/tmp/crunch369510677/p8)"
>>>>
>>>> BR,
>>>>
>>>> Tim van Heugten
>>>>
>>>>
>>>> On Thu, Jan 31, 2013 at 9:27 PM, Gabriel Reid <ga...@gmail.com>wrote:
>>>>
>>>>> Hi Tim,
>>>>>
>>>>> On 31 Jan 2013, at 10:45, Tim van Heugten <st...@gmail.com> wrote:
>>>>>
>>>>> > Hi Gabriel,
>>>>> >
>>>>> > For the most part it is similar to what was send around recently on
>>>>> this mailinglist, see:
>>>>> > From  Dave Beech <d....@paraliatech.com>
>>>>> > Subject       Question about mapreduce job planner
>>>>> > Date  Tue, 15 Jan 2013 11:41:42 GMT
>>>>> >
>>>>> > So, the common path before multiple outputs branch is executed
>>>>> twice. Sometimes the issues seem related to unions though, i.e. multiple
>>>>> inputs. We seem to have been troubled by a grouped table parallelDo on a
>>>>> table-union-gbk that got its data twice (all grouped doubled in size).
>>>>> Inserting a materialize between the union and groupByKey solved the issue.
>>>>> >
>>>>> > These issues seem very fragile (so they're fixed easily by changing
>>>>> something that's irrelevant to the output), so usually we just add or
>>>>> remove a materialization to make it run again.
>>>>> > I'll see if I can cleanly reproduce the data duplication issue later
>>>>> this week.
>>>>>
>>>>> Ok, that would be great if you could replicate it in a small test,
>>>>> thanks!
>>>>>
>>>>> - Gabriel
>>>>
>>>>
>>>>
>>>
>>
>

Re: MemPipeline and context

Posted by Tim van Heugten <st...@gmail.com>.
To summarize:
- When we saw data duplication, that was what we should have been
expecting, given our implementation. That is not the issue.
- Sometimes we didn't see data duplication. That is an issue:
   *Union sometimes ignores one of the input branches.*

I created https://issues.apache.org/jira/browse/CRUNCH-163 for this issue.
The tests singleUnion and doubleUnionWithoutMaterializeInbetween pass in my
environment (0.4), the others fail.
Besides breaking a union by adding a materialize after it I could also
break it by performing a parallelDo after it or by just joining two read
pCollections.


Cheers,

Tim



On Tue, Feb 5, 2013 at 3:38 PM, Tim van Heugten <st...@gmail.com> wrote:

> Hmmm,
>
> So we had a mistake in our code that emitted the data in both branches
> before union2.
> *And*, the crunch union also *failed to merge the data* in some
> circumstance. My side-remark about not seeing the join happen was actually
> bang on.. :-/
>
> So the question now becomes, when does a union ignore one of its incoming
> branches?
> Apparently with materialization in the right spots we can force the
> correct pipeline(*).
>
> Cheers,
>
> Tim van Heugten
>
>
> *) Thereby exposing our bug, seemingly data duplication. But just to be
> clear, this is actually the *correct* behavior.
>
>
>
> On Tue, Feb 5, 2013 at 3:18 PM, Tim van Heugten <st...@gmail.com> wrote:
>
>> Hi,
>>
>> It turns out the data in the two branches that are unioned in union2 is
>> not mutually exclusive (counter to what I was expecting). Probably we
>> should expect data duplication.
>>
>> However, this does still not explain why sometimes we find data
>> duplication and sometimes we don't.
>>
>> Will keep you posted,
>>
>> Tim
>>
>>
>> On Tue, Feb 5, 2013 at 11:32 AM, Tim van Heugten <st...@gmail.com>wrote:
>>
>>> Hi Gabriel,
>>>
>>> I've been unsuccessful so far to reproduce the issue in a controlled
>>> environment. As said, its fragile, maybe the types involved play a role, so
>>> when I tried to simplify those I broke the failure condition.
>>> I decide it's time to try providing more information without giving an
>>> explicit example.
>>>
>>> The pipeline we build is illustrated here: http://yuml.me/8ef99512.
>>> Depending on where we materialize the data occurs twice in UP.
>>> The EITPI job filters the exact opposite of the filter branch. In PWR
>>> only data from EITPI is passed through, while the PITP data is used to
>>> modify it.
>>> Below you find the job names as executed when dataduplication occurs,
>>> materializations occur before BTO(*) and after UP.
>>>
>>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch655004156/p4)"
>>>
>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch655004156/p2)"
>>>
>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch655004156/p8)"
>>>
>>> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch655004156/p8)]]+GBK+UP+Avro(/tmp/crunch655004156/p6)"
>>> "Avro(/tmp/crunch655004156/p6)+GetData+Avro(/tmp/crunch655004156/p10)"
>>> "Avro(/tmp/crunch655004156/p6)+GetTraces+Avro(target/trace-dump/traces)"
>>>
>>> Here are the jobs performed when materialization is added between BTO
>>> and gbk:
>>>
>>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch-551174870/p4)"
>>>
>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch-551174870/p2)"
>>>
>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch-551174870/p6)"
>>> "Avro(/tmp/crunch-551174870/p6)+GBK+UP+Avro(/tmp/crunch-551174870/p8)"
>>> "Avro(/tmp/crunch-551174870/p8)+GetData+Avro(/tmp/crunch-551174870/p10)"
>>> "Avro(/tmp/crunch-551174870/p8)+GetTraces+Avro(target/trace-dump/traces)"
>>>
>>> Without changing changing anything else, the added materialization fixes
>>> the issue of data duplication.
>>>
>>> If you have any clues how I can extract a clean working example I'm
>>> happy to hear.
>>>
>>>
>>> *) This materialization probably explains the second job, however, where
>>> the filtered data is joined is lost on me. This is not the cause though,
>>> with just one materialize at the end, after UP, the data count still
>>> doubled. The jobs then look like this:
>>>
>>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch369510677/p4)"
>>>
>>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch369510677/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch369510677/p6)"
>>>
>>> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch369510677/p6)]]+GBK+UP+Avro(/tmp/crunch369510677/p2)"
>>> "Avro(/tmp/crunch369510677/p2)+GetTraces+Avro(target/trace-dump/traces)"
>>> "Avro(/tmp/crunch369510677/p2)+GetData+Avro(/tmp/crunch369510677/p8)"
>>>
>>> BR,
>>>
>>> Tim van Heugten
>>>
>>>
>>> On Thu, Jan 31, 2013 at 9:27 PM, Gabriel Reid <ga...@gmail.com>wrote:
>>>
>>>> Hi Tim,
>>>>
>>>> On 31 Jan 2013, at 10:45, Tim van Heugten <st...@gmail.com> wrote:
>>>>
>>>> > Hi Gabriel,
>>>> >
>>>> > For the most part it is similar to what was send around recently on
>>>> this mailinglist, see:
>>>> > From  Dave Beech <d....@paraliatech.com>
>>>> > Subject       Question about mapreduce job planner
>>>> > Date  Tue, 15 Jan 2013 11:41:42 GMT
>>>> >
>>>> > So, the common path before multiple outputs branch is executed twice.
>>>> Sometimes the issues seem related to unions though, i.e. multiple inputs.
>>>> We seem to have been troubled by a grouped table parallelDo on a
>>>> table-union-gbk that got its data twice (all grouped doubled in size).
>>>> Inserting a materialize between the union and groupByKey solved the issue.
>>>> >
>>>> > These issues seem very fragile (so they're fixed easily by changing
>>>> something that's irrelevant to the output), so usually we just add or
>>>> remove a materialization to make it run again.
>>>> > I'll see if I can cleanly reproduce the data duplication issue later
>>>> this week.
>>>>
>>>> Ok, that would be great if you could replicate it in a small test,
>>>> thanks!
>>>>
>>>> - Gabriel
>>>
>>>
>>>
>>
>

Re: MemPipeline and context

Posted by Tim van Heugten <st...@gmail.com>.
Hmmm,

So we had a mistake in our code that emitted the data in both branches
before union2.
*And*, the crunch union also *failed to merge the data* in some
circumstance. My side-remark about not seeing the join happen was actually
bang on.. :-/

So the question now becomes, when does a union ignore one of its incoming
branches?
Apparently with materialization in the right spots we can force the correct
pipeline(*).

Cheers,

Tim van Heugten


*) Thereby exposing our bug, seemingly data duplication. But just to be
clear, this is actually the *correct* behavior.


On Tue, Feb 5, 2013 at 3:18 PM, Tim van Heugten <st...@gmail.com> wrote:

> Hi,
>
> It turns out the data in the two branches that are unioned in union2 is
> not mutually exclusive (counter to what I was expecting). Probably we
> should expect data duplication.
>
> However, this does still not explain why sometimes we find data
> duplication and sometimes we don't.
>
> Will keep you posted,
>
> Tim
>
>
> On Tue, Feb 5, 2013 at 11:32 AM, Tim van Heugten <st...@gmail.com> wrote:
>
>> Hi Gabriel,
>>
>> I've been unsuccessful so far to reproduce the issue in a controlled
>> environment. As said, its fragile, maybe the types involved play a role, so
>> when I tried to simplify those I broke the failure condition.
>> I decide it's time to try providing more information without giving an
>> explicit example.
>>
>> The pipeline we build is illustrated here: http://yuml.me/8ef99512.
>> Depending on where we materialize the data occurs twice in UP.
>> The EITPI job filters the exact opposite of the filter branch. In PWR
>> only data from EITPI is passed through, while the PITP data is used to
>> modify it.
>> Below you find the job names as executed when dataduplication occurs,
>> materializations occur before BTO(*) and after UP.
>>
>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch655004156/p4)"
>>
>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch655004156/p2)"
>>
>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch655004156/p8)"
>>
>> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch655004156/p8)]]+GBK+UP+Avro(/tmp/crunch655004156/p6)"
>> "Avro(/tmp/crunch655004156/p6)+GetData+Avro(/tmp/crunch655004156/p10)"
>> "Avro(/tmp/crunch655004156/p6)+GetTraces+Avro(target/trace-dump/traces)"
>>
>> Here are the jobs performed when materialization is added between BTO and
>> gbk:
>>
>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch-551174870/p4)"
>>
>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch-551174870/p2)"
>>
>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch-551174870/p6)"
>> "Avro(/tmp/crunch-551174870/p6)+GBK+UP+Avro(/tmp/crunch-551174870/p8)"
>> "Avro(/tmp/crunch-551174870/p8)+GetData+Avro(/tmp/crunch-551174870/p10)"
>> "Avro(/tmp/crunch-551174870/p8)+GetTraces+Avro(target/trace-dump/traces)"
>>
>> Without changing changing anything else, the added materialization fixes
>> the issue of data duplication.
>>
>> If you have any clues how I can extract a clean working example I'm happy
>> to hear.
>>
>>
>> *) This materialization probably explains the second job, however, where
>> the filtered data is joined is lost on me. This is not the cause though,
>> with just one materialize at the end, after UP, the data count still
>> doubled. The jobs then look like this:
>>
>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch369510677/p4)"
>>
>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch369510677/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch369510677/p6)"
>>
>> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch369510677/p6)]]+GBK+UP+Avro(/tmp/crunch369510677/p2)"
>> "Avro(/tmp/crunch369510677/p2)+GetTraces+Avro(target/trace-dump/traces)"
>> "Avro(/tmp/crunch369510677/p2)+GetData+Avro(/tmp/crunch369510677/p8)"
>>
>> BR,
>>
>> Tim van Heugten
>>
>>
>> On Thu, Jan 31, 2013 at 9:27 PM, Gabriel Reid <ga...@gmail.com>wrote:
>>
>>> Hi Tim,
>>>
>>> On 31 Jan 2013, at 10:45, Tim van Heugten <st...@gmail.com> wrote:
>>>
>>> > Hi Gabriel,
>>> >
>>> > For the most part it is similar to what was send around recently on
>>> this mailinglist, see:
>>> > From  Dave Beech <d....@paraliatech.com>
>>> > Subject       Question about mapreduce job planner
>>> > Date  Tue, 15 Jan 2013 11:41:42 GMT
>>> >
>>> > So, the common path before multiple outputs branch is executed twice.
>>> Sometimes the issues seem related to unions though, i.e. multiple inputs.
>>> We seem to have been troubled by a grouped table parallelDo on a
>>> table-union-gbk that got its data twice (all grouped doubled in size).
>>> Inserting a materialize between the union and groupByKey solved the issue.
>>> >
>>> > These issues seem very fragile (so they're fixed easily by changing
>>> something that's irrelevant to the output), so usually we just add or
>>> remove a materialization to make it run again.
>>> > I'll see if I can cleanly reproduce the data duplication issue later
>>> this week.
>>>
>>> Ok, that would be great if you could replicate it in a small test,
>>> thanks!
>>>
>>> - Gabriel
>>
>>
>>
>

Re: MemPipeline and context

Posted by Tim van Heugten <st...@gmail.com>.
Hi,

It turns out the data in the two branches that are unioned in union2 is not
mutually exclusive (counter to what I was expecting). Probably we should
expect data duplication.

However, this does still not explain why sometimes we find data duplication
and sometimes we don't.

Will keep you posted,

Tim


On Tue, Feb 5, 2013 at 11:32 AM, Tim van Heugten <st...@gmail.com> wrote:

> Hi Gabriel,
>
> I've been unsuccessful so far to reproduce the issue in a controlled
> environment. As said, its fragile, maybe the types involved play a role, so
> when I tried to simplify those I broke the failure condition.
> I decide it's time to try providing more information without giving an
> explicit example.
>
> The pipeline we build is illustrated here: http://yuml.me/8ef99512.
> Depending on where we materialize the data occurs twice in UP.
> The EITPI job filters the exact opposite of the filter branch. In PWR only
> data from EITPI is passed through, while the PITP data is used to modify it.
> Below you find the job names as executed when dataduplication occurs,
> materializations occur before BTO(*) and after UP.
> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch655004156/p4)"
>
> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch655004156/p2)"
>
> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch655004156/p8)"
>
> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch655004156/p8)]]+GBK+UP+Avro(/tmp/crunch655004156/p6)"
> "Avro(/tmp/crunch655004156/p6)+GetData+Avro(/tmp/crunch655004156/p10)"
> "Avro(/tmp/crunch655004156/p6)+GetTraces+Avro(target/trace-dump/traces)"
>
> Here are the jobs performed when materialization is added between BTO and
> gbk:
>
> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch-551174870/p4)"
>
> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch-551174870/p2)"
>
> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch-551174870/p6)"
> "Avro(/tmp/crunch-551174870/p6)+GBK+UP+Avro(/tmp/crunch-551174870/p8)"
> "Avro(/tmp/crunch-551174870/p8)+GetData+Avro(/tmp/crunch-551174870/p10)"
> "Avro(/tmp/crunch-551174870/p8)+GetTraces+Avro(target/trace-dump/traces)"
>
> Without changing changing anything else, the added materialization fixes
> the issue of data duplication.
>
> If you have any clues how I can extract a clean working example I'm happy
> to hear.
>
>
> *) This materialization probably explains the second job, however, where
> the filtered data is joined is lost on me. This is not the cause though,
> with just one materialize at the end, after UP, the data count still
> doubled. The jobs then look like this:
> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch369510677/p4)"
>
> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch369510677/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch369510677/p6)"
>
> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch369510677/p6)]]+GBK+UP+Avro(/tmp/crunch369510677/p2)"
> "Avro(/tmp/crunch369510677/p2)+GetTraces+Avro(target/trace-dump/traces)"
> "Avro(/tmp/crunch369510677/p2)+GetData+Avro(/tmp/crunch369510677/p8)"
>
> BR,
>
> Tim van Heugten
>
>
> On Thu, Jan 31, 2013 at 9:27 PM, Gabriel Reid <ga...@gmail.com>wrote:
>
>> Hi Tim,
>>
>> On 31 Jan 2013, at 10:45, Tim van Heugten <st...@gmail.com> wrote:
>>
>> > Hi Gabriel,
>> >
>> > For the most part it is similar to what was send around recently on
>> this mailinglist, see:
>> > From  Dave Beech <d....@paraliatech.com>
>> > Subject       Question about mapreduce job planner
>> > Date  Tue, 15 Jan 2013 11:41:42 GMT
>> >
>> > So, the common path before multiple outputs branch is executed twice.
>> Sometimes the issues seem related to unions though, i.e. multiple inputs.
>> We seem to have been troubled by a grouped table parallelDo on a
>> table-union-gbk that got its data twice (all grouped doubled in size).
>> Inserting a materialize between the union and groupByKey solved the issue.
>> >
>> > These issues seem very fragile (so they're fixed easily by changing
>> something that's irrelevant to the output), so usually we just add or
>> remove a materialization to make it run again.
>> > I'll see if I can cleanly reproduce the data duplication issue later
>> this week.
>>
>> Ok, that would be great if you could replicate it in a small test, thanks!
>>
>> - Gabriel
>
>
>

Re: MemPipeline and context

Posted by Tim van Heugten <st...@gmail.com>.
Hi Gabriel,

I've been unsuccessful so far to reproduce the issue in a controlled
environment. As said, its fragile, maybe the types involved play a role, so
when I tried to simplify those I broke the failure condition.
I decide it's time to try providing more information without giving an
explicit example.

The pipeline we build is illustrated here: http://yuml.me/8ef99512.
Depending on where we materialize the data occurs twice in UP.
The EITPI job filters the exact opposite of the filter branch. In PWR only
data from EITPI is passed through, while the PITP data is used to modify it.
Below you find the job names as executed when dataduplication occurs,
materializations occur before BTO(*) and after UP.
"Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch655004156/p4)"
"[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch655004156/p2)"
"[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch655004156/p8)"
"[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch655004156/p8)]]+GBK+UP+Avro(/tmp/crunch655004156/p6)"
"Avro(/tmp/crunch655004156/p6)+GetData+Avro(/tmp/crunch655004156/p10)"
"Avro(/tmp/crunch655004156/p6)+GetTraces+Avro(target/trace-dump/traces)"

Here are the jobs performed when materialization is added between BTO and
gbk:
"Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch-551174870/p4)"
"[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch-551174870/p2)"
"[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch-551174870/p6)"
"Avro(/tmp/crunch-551174870/p6)+GBK+UP+Avro(/tmp/crunch-551174870/p8)"
"Avro(/tmp/crunch-551174870/p8)+GetData+Avro(/tmp/crunch-551174870/p10)"
"Avro(/tmp/crunch-551174870/p8)+GetTraces+Avro(target/trace-dump/traces)"

Without changing changing anything else, the added materialization fixes
the issue of data duplication.

If you have any clues how I can extract a clean working example I'm happy
to hear.


*) This materialization probably explains the second job, however, where
the filtered data is joined is lost on me. This is not the cause though,
with just one materialize at the end, after UP, the data count still
doubled. The jobs then look like this:
"Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch369510677/p4)"
"[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch369510677/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch369510677/p6)"
"[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch369510677/p6)]]+GBK+UP+Avro(/tmp/crunch369510677/p2)"
"Avro(/tmp/crunch369510677/p2)+GetTraces+Avro(target/trace-dump/traces)"
"Avro(/tmp/crunch369510677/p2)+GetData+Avro(/tmp/crunch369510677/p8)"

BR,

Tim van Heugten


On Thu, Jan 31, 2013 at 9:27 PM, Gabriel Reid <ga...@gmail.com>wrote:

> Hi Tim,
>
> On 31 Jan 2013, at 10:45, Tim van Heugten <st...@gmail.com> wrote:
>
> > Hi Gabriel,
> >
> > For the most part it is similar to what was send around recently on this
> mailinglist, see:
> > From  Dave Beech <d....@paraliatech.com>
> > Subject       Question about mapreduce job planner
> > Date  Tue, 15 Jan 2013 11:41:42 GMT
> >
> > So, the common path before multiple outputs branch is executed twice.
> Sometimes the issues seem related to unions though, i.e. multiple inputs.
> We seem to have been troubled by a grouped table parallelDo on a
> table-union-gbk that got its data twice (all grouped doubled in size).
> Inserting a materialize between the union and groupByKey solved the issue.
> >
> > These issues seem very fragile (so they're fixed easily by changing
> something that's irrelevant to the output), so usually we just add or
> remove a materialization to make it run again.
> > I'll see if I can cleanly reproduce the data duplication issue later
> this week.
>
> Ok, that would be great if you could replicate it in a small test, thanks!
>
> - Gabriel

Re: MemPipeline and context

Posted by Gabriel Reid <ga...@gmail.com>.
Hi Tim,

On 31 Jan 2013, at 10:45, Tim van Heugten <st...@gmail.com> wrote:

> Hi Gabriel,
> 
> For the most part it is similar to what was send around recently on this mailinglist, see:
> From	Dave Beech <d....@paraliatech.com>
> Subject	Question about mapreduce job planner
> Date	Tue, 15 Jan 2013 11:41:42 GMT
> 
> So, the common path before multiple outputs branch is executed twice. Sometimes the issues seem related to unions though, i.e. multiple inputs. We seem to have been troubled by a grouped table parallelDo on a table-union-gbk that got its data twice (all grouped doubled in size). Inserting a materialize between the union and groupByKey solved the issue.
> 
> These issues seem very fragile (so they're fixed easily by changing something that's irrelevant to the output), so usually we just add or remove a materialization to make it run again.
> I'll see if I can cleanly reproduce the data duplication issue later this week.

Ok, that would be great if you could replicate it in a small test, thanks!

- Gabriel

Re: MemPipeline and context

Posted by Tim van Heugten <st...@gmail.com>.
Hi Gabriel,

For the most part it is similar to what was send around recently on this
mailinglist, see:
FromDave Beech <d....@paraliatech.com> SubjectQuestion about mapreduce job
plannerDateTue, 15 Jan 2013 11:41:42 GMT

So, the common path before multiple outputs branch is executed twice.
Sometimes the issues seem related to unions though, i.e. multiple inputs.
We seem to have been troubled by a grouped table parallelDo on a
table-union-gbk that got its data twice (all grouped doubled in size).
Inserting a materialize between the union and groupByKey solved the issue.

These issues seem very fragile (so they're fixed easily by changing
something that's irrelevant to the output), so usually we just add or
remove a materialization to make it run again.
I'll see if I can cleanly reproduce the data duplication issue later this
week.


Cheers,

Tim


On Wed, Jan 30, 2013 at 8:51 PM, Gabriel Reid <ga...@gmail.com>wrote:

> Hi Tim,
>
> On Wed, Jan 30, 2013 at 10:33 AM, Tim van Heugten <st...@gmail.com>wrote:
>
>>
>> Since april I'm using Crunch for a project. We're not doing only linear
>> executions of the pipeline, so we're sometimes having issues with how
>> Crunch is optimizing our execution graph. We need to add materializations
>> here and there as hints to what parts of the graph can be shared for
>> outputs and so on.
>>
>
> About the extra calls to materialize to force changes to the execution
> plan: I remember seeing this previously. We've discussed adding something
> specifically for this functionality to the API, although it hasn't yet
> happened.
>
> Could you give an example of a situation where these extra materialize
> calls get added? That would be useful for validating the addition to the
> API.
>
>
> Thanks,
>
> Gabriel
>
>

Re: MemPipeline and context

Posted by Gabriel Reid <ga...@gmail.com>.
Hi Tim,

On Wed, Jan 30, 2013 at 10:33 AM, Tim van Heugten <st...@gmail.com> wrote:

>
> Since april I'm using Crunch for a project. We're not doing only linear
> executions of the pipeline, so we're sometimes having issues with how
> Crunch is optimizing our execution graph. We need to add materializations
> here and there as hints to what parts of the graph can be shared for
> outputs and so on.
>

About the extra calls to materialize to force changes to the execution
plan: I remember seeing this previously. We've discussed adding something
specifically for this functionality to the API, although it hasn't yet
happened.

Could you give an example of a situation where these extra materialize
calls get added? That would be useful for validating the addition to the
API.


Thanks,

Gabriel