You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pawel Szczur <pa...@gmail.com> on 2016/05/27 18:52:39 UTC

Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

*Data description.*

I have two datasets.

Records - the first, containes around 0.5-1M of records per (key,day). For
testing I use 2-3 keys and 5-10 days of data. What I shoot for is 1000+
keys. Each record contains key, timestamp in μ-seconds and some other data.
Configs - the second, is rather small. It describes the key in time, e.g.
you can think about it as a list of tuples: (key, start date, end date,
description).

For the exploration I've encoded the data as files of length-prefixed
Protocol Buffer binary encoded messages. Additionally the files are packed
with gzip. Data is sharded by date. Each file is around 10MB.

*Pipeline*

First I add keys to both datasets. For Records dataset it's (key, day
rounded timestamp). For Configs a key is (key, day), where day is each
timestamp value between start date and end date (pointing midnight).
The datasets are merged using CoGroupByKey.

As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with a
Tuple2Coder from this repo.

*The problem*

If the Records dataset is tiny like 5 days, everything seems fine (check
normal_run.log).

 INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
 INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
 INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
 INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0

When I run the pipeline against 10+ days I encounter an error pointing that
for some Records there's no Config (wrong_run.log).

 INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
 INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
 INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
 INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0

Then I've added some extra logging messages:

(ConvertToItem.java:144) - 68643 items for KeyValue3 on: 1462665600000000
(ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000
(ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
(ConvertToItem.java:142) - 753707 items for KeyValue3 on: 1462924800000000
marked as no-loc
(ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
(ConvertToItem.java:142) - 749901 items for KeyValue3 on: 1462752000000000
marked as no-loc
(ConvertToItem.java:144) - 754578 items for KeyValue3 on: 1462406400000000
(ConvertToItem.java:144) - 751574 items for KeyValue3 on: 1463011200000000
(ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
(ConvertToItem.java:142) - 754758 items for KeyValue3 on: 1462665600000000
marked as no-loc
(ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
(ConvertToItem.java:142) - 694372 items for KeyValue3 on: 1463184000000000
marked as no-loc

You can spot that in first line 68643 items were processed for KeyValue3
and time 1462665600000000.
Later on in line 9 it seems the operation processes the same key again, but
it reports that no Config was available for these Records.
The line 10 informs they've been marked as no-loc.

The line 2 is saying that there were no items for KeyValue3 and time
1463184000000000, but in line 11 you can read that the items for this
(key,day) pair were processed later and they've lacked a Config.

*Work-around (after more testing, doesn't work, staying with Tuple2)*

I've switched from using Tuple2 to a Protocol Buffer message:

message KeyDay {
  optional ByteString key = 1;
  optional int64 timestamp_usec = 2;
}

But using Tuple2.of() was just easier than:
KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().

// The original description comes from:
http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Maximilian Michels <mx...@apache.org>.
Thank you for opening the issue and providing the example program to
reproduce the problem. Yes, it appears to be a bug in the Runner.
We're looking into it.

On Tue, May 31, 2016 at 3:47 PM, Pawel Szczur <pa...@gmail.com> wrote:
> FYI I've opened an issue: https://issues.apache.org/jira/browse/BEAM-315
>
> 2016-05-31 14:51 GMT+02:00 Pawel Szczur <pa...@gmail.com>:
>>
>> I've also added the test for GroupByKey. It fails. It kind of makes Flink
>> broken at the moment, isn't it?
>>
>> I'm wondering.. may it be related to some Windowing issue?
>>
>> 2016-05-31 14:40 GMT+02:00 Pawel Szczur <pa...@gmail.com>:
>>>
>>> I've just tested it. It fails.
>>>
>>> Also added the test to the repo:
>>> https://github.com/orian/cogroup-wrong-grouping
>>>
>>> I reason, this means that GroupByKey is flawed? If you open an official
>>> issue, please add it to discussion.
>>>
>>> 2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>
>>>> Does 2. work for the cases where CoGroupByKey fails? Reason I'm asking
>>>> is that CoGroupByKey is essentially implemented like that internally: create
>>>> tagged union -> flatten -> GroupByKey.
>>>>
>>>> On Tue, 31 May 2016 at 01:16 Pawel Szczur <pa...@gmail.com> wrote:
>>>>>
>>>>> I've naively tried few other key types, it seems to be unrelated to key
>>>>> type.
>>>>>
>>>>> As for now I have two workarounds and ignorance:
>>>>>  1. If there is one dominant dataset and other datasets are small (size
>>>>> << GB) then I use SideInput.
>>>>>  2. If I have multiple datasets of similar size I enclose it in a
>>>>> common container, flatten it and GroupByKey.
>>>>>  3. I measure occurrences and ignore the bug for now.
>>>>>
>>>>> Do you have an idea how a test for this may be constructed? It seems
>>>>> handy, I think.
>>>>>
>>>>> I also found two things, maybe they help you:
>>>>>  1. issue doesn't appear without parallelism
>>>>>  2. issue doesn't appear with a tiny datasets
>>>>>
>>>>> 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>
>>>>>> You're right. I'm still looking into this, unfortunately I haven't
>>>>>> made progress so far. I'll keep you posted.
>>>>>>
>>>>>> On Sun, 29 May 2016 at 18:20 Pawel Szczur <pa...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I used the config as in the repo.
>>>>>>> Please grep the the log for "hereGoesLongStringID0,2", you will see
>>>>>>> that this key is processed multiple times.
>>>>>>>
>>>>>>> This is how I understand CoGroupByKey: one has two (or more)
>>>>>>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a
>>>>>>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values from
>>>>>>> all input PCollections which have the given key.
>>>>>>>
>>>>>>> But from the log it seems that each key produced more than one
>>>>>>> CoGbkResult.
>>>>>>>
>>>>>>> The final counters didn't catch the bug because in your case, the
>>>>>>> value from dataset1 was replicated for each key.
>>>>>>>
>>>>>>> Cheers, Pawel
>>>>>>>
>>>>>>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I ran your data generator with these configs:
>>>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>>>>>>>>     .apply(ParDo.of(new Generator())).apply(
>>>>>>>>
>>>>>>>> AvroIO.Write.to("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>>>>>>>>
>>>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>>>>>>>>     apply(ParDo.of(new Generator())).apply(
>>>>>>>>
>>>>>>>> AvroIO.Write.to("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>>>>>>>>
>>>>>>>> Then I ran the job with parallelism=6. I couldn't reproduce the
>>>>>>>> problem, this is the log file from one of several runs:
>>>>>>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>>>>>>>>
>>>>>>>> Could you please send me the exact config that you used. Btw, I ran
>>>>>>>> it inside an IDE, do the problems also occur in the IDE for you or only when
>>>>>>>> you execute on a cluster?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <pa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Aljoscha.
>>>>>>>>>
>>>>>>>>> I've created a repo with fake dataset to allow easily reproduce the
>>>>>>>>> problem:
>>>>>>>>> https://github.com/orian/cogroup-wrong-grouping
>>>>>>>>>
>>>>>>>>> What I noticed: if the dataset is too small the bug doesn't appear.
>>>>>>>>>
>>>>>>>>> You can modify the size of dataset, but in ideal case it should be
>>>>>>>>> few hundred thousands records per key (I guess it depends on the machine you
>>>>>>>>> run it).
>>>>>>>>>
>>>>>>>>> Cheers, Pawel
>>>>>>>>>
>>>>>>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> which version of Beam/Flink are you using.
>>>>>>>>>>
>>>>>>>>>> Could you maybe also provide example data and code that showcases
>>>>>>>>>> the problem? If you have concerns about sending it to a public list you can
>>>>>>>>>> also send it to me directly.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Data description.
>>>>>>>>>>>
>>>>>>>>>>> I have two datasets.
>>>>>>>>>>>
>>>>>>>>>>> Records - the first, containes around 0.5-1M of records per
>>>>>>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I shoot
>>>>>>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds and some
>>>>>>>>>>> other data.
>>>>>>>>>>> Configs - the second, is rather small. It describes the key in
>>>>>>>>>>> time, e.g. you can think about it as a list of tuples: (key, start date, end
>>>>>>>>>>> date, description).
>>>>>>>>>>>
>>>>>>>>>>> For the exploration I've encoded the data as files of
>>>>>>>>>>> length-prefixed Protocol Buffer binary encoded messages. Additionally the
>>>>>>>>>>> files are packed with gzip. Data is sharded by date. Each file is around
>>>>>>>>>>> 10MB.
>>>>>>>>>>>
>>>>>>>>>>> Pipeline
>>>>>>>>>>>
>>>>>>>>>>> First I add keys to both datasets. For Records dataset it's (key,
>>>>>>>>>>> day rounded timestamp). For Configs a key is (key, day), where day is each
>>>>>>>>>>> timestamp value between start date and end date (pointing midnight).
>>>>>>>>>>> The datasets are merged using CoGroupByKey.
>>>>>>>>>>>
>>>>>>>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2
>>>>>>>>>>> with a Tuple2Coder from this repo.
>>>>>>>>>>>
>>>>>>>>>>> The problem
>>>>>>>>>>>
>>>>>>>>>>> If the Records dataset is tiny like 5 days, everything seems fine
>>>>>>>>>>> (check normal_run.log).
>>>>>>>>>>>
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>>>>> values:
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count :
>>>>>>>>>>> 4322332
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>>>>
>>>>>>>>>>> When I run the pipeline against 10+ days I encounter an error
>>>>>>>>>>> pointing that for some Records there's no Config (wrong_run.log).
>>>>>>>>>>>
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>>>>> values:
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count :
>>>>>>>>>>> 8577197
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>>>>
>>>>>>>>>>> Then I've added some extra logging messages:
>>>>>>>>>>>
>>>>>>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>>>>>>>>> 1462665600000000
>>>>>>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on:
>>>>>>>>>>> 1463184000000000
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1462924800000000
>>>>>>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>>>>>>>>> 1462924800000000 marked as no-loc
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1462752000000000
>>>>>>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>>>>>>>>> 1462752000000000 marked as no-loc
>>>>>>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>>>>>>>>> 1462406400000000
>>>>>>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>>>>>>>>> 1463011200000000
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1462665600000000
>>>>>>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>>>>>>>>> 1462665600000000 marked as no-loc
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1463184000000000
>>>>>>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>>>>>>>>> 1463184000000000 marked as no-loc
>>>>>>>>>>>
>>>>>>>>>>> You can spot that in first line 68643 items were processed for
>>>>>>>>>>> KeyValue3 and time 1462665600000000.
>>>>>>>>>>> Later on in line 9 it seems the operation processes the same key
>>>>>>>>>>> again, but it reports that no Config was available for these Records.
>>>>>>>>>>> The line 10 informs they've been marked as no-loc.
>>>>>>>>>>>
>>>>>>>>>>> The line 2 is saying that there were no items for KeyValue3 and
>>>>>>>>>>> time 1463184000000000, but in line 11 you can read that the items for this
>>>>>>>>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>>>>>>>>
>>>>>>>>>>> Work-around (after more testing, doesn't work, staying with
>>>>>>>>>>> Tuple2)
>>>>>>>>>>>
>>>>>>>>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>>>>>>>>
>>>>>>>>>>> message KeyDay {
>>>>>>>>>>>   optional ByteString key = 1;
>>>>>>>>>>>   optional int64 timestamp_usec = 2;
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> But using Tuple2.of() was just easier than:
>>>>>>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>>>>>>>>
>>>>>>>>>>> // The original description comes from:
>>>>>>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>>
>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Aljoscha Krettek <al...@apache.org>.
It might be an issue in the Flink runner yes. It's hard to test it on the
DirectPipelineRunner or the InProcessRunner, though, since they both fail
with OOM exceptions once data gets sufficiently large to make the bug
appear.

On Tue, 31 May 2016 at 15:47 Pawel Szczur <pa...@gmail.com> wrote:

> FYI I've opened an issue: https://issues.apache.org/jira/browse/BEAM-315
>
> 2016-05-31 14:51 GMT+02:00 Pawel Szczur <pa...@gmail.com>:
>
>> I've also added the test for GroupByKey. It fails. It kind of makes Flink
>> broken at the moment, isn't it?
>>
>> I'm wondering.. may it be related to some Windowing issue?
>>
>> 2016-05-31 14:40 GMT+02:00 Pawel Szczur <pa...@gmail.com>:
>>
>>> I've just tested it. It fails.
>>>
>>> Also added the test to the repo:
>>> https://github.com/orian/cogroup-wrong-grouping
>>>
>>> I reason, this means that GroupByKey is flawed? If you open an official
>>> issue, please add it to discussion.
>>>
>>> 2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>>> Does 2. work for the cases where CoGroupByKey fails? Reason I'm asking
>>>> is that CoGroupByKey is essentially implemented like that internally:
>>>> create tagged union -> flatten -> GroupByKey.
>>>>
>>>> On Tue, 31 May 2016 at 01:16 Pawel Szczur <pa...@gmail.com>
>>>> wrote:
>>>>
>>>>> I've naively tried few other key types, it seems to be unrelated to
>>>>> key type.
>>>>>
>>>>> As for now I have two workarounds and ignorance:
>>>>>  1. If there is one dominant dataset and other datasets are small
>>>>> (size << GB) then I use SideInput.
>>>>>  2. If I have multiple datasets of similar size I enclose it in a
>>>>> common container, flatten it and GroupByKey.
>>>>>  3. I measure occurrences and ignore the bug for now.
>>>>>
>>>>> Do you have an idea how a test for this may be constructed? It seems
>>>>> handy, I think.
>>>>>
>>>>> I also found two things, maybe they help you:
>>>>>  1. issue doesn't appear without parallelism
>>>>>  2. issue doesn't appear with a tiny datasets
>>>>>
>>>>> 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>
>>>>>> You're right. I'm still looking into this, unfortunately I haven't
>>>>>> made progress so far. I'll keep you posted.
>>>>>>
>>>>>> On Sun, 29 May 2016 at 18:20 Pawel Szczur <pa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I used the config as in the repo.
>>>>>>> Please grep the the log for "hereGoesLongStringID0,2", you will see
>>>>>>> that this key is processed multiple times.
>>>>>>>
>>>>>>> This is how I understand CoGroupByKey: one has two (or more)
>>>>>>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a
>>>>>>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values
>>>>>>> from all input PCollections which have the given key.
>>>>>>>
>>>>>>> But from the log it seems that each key produced more than one
>>>>>>> CoGbkResult.
>>>>>>>
>>>>>>> The final counters didn't catch the bug because in your case, the
>>>>>>> value from dataset1 was replicated for each key.
>>>>>>>
>>>>>>> Cheers, Pawel
>>>>>>>
>>>>>>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I ran your data generator with these configs:
>>>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>>>>>>>>     .apply(ParDo.of(new Generator())).apply(
>>>>>>>>         AvroIO.Write.to
>>>>>>>> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>>>>>>>>
>>>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>>>>>>>>     apply(ParDo.of(new Generator())).apply(
>>>>>>>>         AvroIO.Write.to
>>>>>>>> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>>>>>>>>
>>>>>>>> Then I ran the job with parallelism=6. I couldn't reproduce the
>>>>>>>> problem, this is the log file from one of several runs:
>>>>>>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>>>>>>>>
>>>>>>>> Could you please send me the exact config that you used. Btw, I ran
>>>>>>>> it inside an IDE, do the problems also occur in the IDE for you or only
>>>>>>>> when you execute on a cluster?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <pa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Aljoscha.
>>>>>>>>>
>>>>>>>>> I've created a repo with fake dataset to allow easily reproduce
>>>>>>>>> the problem:
>>>>>>>>> https://github.com/orian/cogroup-wrong-grouping
>>>>>>>>>
>>>>>>>>> What I noticed: if the dataset is too small the bug doesn't appear.
>>>>>>>>>
>>>>>>>>> You can modify the size of dataset, but in ideal case it should be
>>>>>>>>> few hundred thousands records per key (I guess it depends on the machine
>>>>>>>>> you run it).
>>>>>>>>>
>>>>>>>>> Cheers, Pawel
>>>>>>>>>
>>>>>>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> which version of Beam/Flink are you using.
>>>>>>>>>>
>>>>>>>>>> Could you maybe also provide example data and code that showcases
>>>>>>>>>> the problem? If you have concerns about sending it to a public list you can
>>>>>>>>>> also send it to me directly.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> *Data description.*
>>>>>>>>>>>
>>>>>>>>>>> I have two datasets.
>>>>>>>>>>>
>>>>>>>>>>> Records - the first, containes around 0.5-1M of records per
>>>>>>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I shoot
>>>>>>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds and
>>>>>>>>>>> some other data.
>>>>>>>>>>> Configs - the second, is rather small. It describes the key in
>>>>>>>>>>> time, e.g. you can think about it as a list of tuples: (key, start date,
>>>>>>>>>>> end date, description).
>>>>>>>>>>>
>>>>>>>>>>> For the exploration I've encoded the data as files of
>>>>>>>>>>> length-prefixed Protocol Buffer binary encoded messages. Additionally the
>>>>>>>>>>> files are packed with gzip. Data is sharded by date. Each file is around
>>>>>>>>>>> 10MB.
>>>>>>>>>>>
>>>>>>>>>>> *Pipeline*
>>>>>>>>>>>
>>>>>>>>>>> First I add keys to both datasets. For Records dataset it's
>>>>>>>>>>> (key, day rounded timestamp). For Configs a key is (key, day), where day is
>>>>>>>>>>> each timestamp value between start date and end date (pointing midnight).
>>>>>>>>>>> The datasets are merged using CoGroupByKey.
>>>>>>>>>>>
>>>>>>>>>>> As a key type I use import
>>>>>>>>>>> org.apache.flink.api.java.tuple.Tuple2 with a Tuple2Coder from this repo.
>>>>>>>>>>>
>>>>>>>>>>> *The problem*
>>>>>>>>>>>
>>>>>>>>>>> If the Records dataset is tiny like 5 days, everything seems
>>>>>>>>>>> fine (check normal_run.log).
>>>>>>>>>>>
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>>>>> values:
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count :
>>>>>>>>>>> 4322332
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>>>>
>>>>>>>>>>> When I run the pipeline against 10+ days I encounter an error
>>>>>>>>>>> pointing that for some Records there's no Config (wrong_run.log).
>>>>>>>>>>>
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>>>>> values:
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count :
>>>>>>>>>>> 8577197
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>>>>
>>>>>>>>>>> Then I've added some extra logging messages:
>>>>>>>>>>>
>>>>>>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>>>>>>>>> 1462665600000000
>>>>>>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on:
>>>>>>>>>>> 1463184000000000
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1462924800000000
>>>>>>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>>>>>>>>> 1462924800000000 marked as no-loc
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1462752000000000
>>>>>>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>>>>>>>>> 1462752000000000 marked as no-loc
>>>>>>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>>>>>>>>> 1462406400000000
>>>>>>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>>>>>>>>> 1463011200000000
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1462665600000000
>>>>>>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>>>>>>>>> 1462665600000000 marked as no-loc
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1463184000000000
>>>>>>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>>>>>>>>> 1463184000000000 marked as no-loc
>>>>>>>>>>>
>>>>>>>>>>> You can spot that in first line 68643 items were processed for
>>>>>>>>>>> KeyValue3 and time 1462665600000000.
>>>>>>>>>>> Later on in line 9 it seems the operation processes the same key
>>>>>>>>>>> again, but it reports that no Config was available for these Records.
>>>>>>>>>>> The line 10 informs they've been marked as no-loc.
>>>>>>>>>>>
>>>>>>>>>>> The line 2 is saying that there were no items for KeyValue3 and
>>>>>>>>>>> time 1463184000000000, but in line 11 you can read that the items for this
>>>>>>>>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>>>>>>>>
>>>>>>>>>>> *Work-around (after more testing, doesn't work, staying with
>>>>>>>>>>> Tuple2)*
>>>>>>>>>>>
>>>>>>>>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>>>>>>>>
>>>>>>>>>>> message KeyDay {
>>>>>>>>>>>   optional ByteString key = 1;
>>>>>>>>>>>   optional int64 timestamp_usec = 2;
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> But using Tuple2.of() was just easier than:
>>>>>>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>>>>>>>>
>>>>>>>>>>> // The original description comes from:
>>>>>>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>>
>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
It sounds like a bug in the Flink runner. I'm pretty sure Max and 
Aljoscha will fix that soon ;)

Regards
JB

On 05/31/2016 03:47 PM, Pawel Szczur wrote:
> FYI I've opened an issue: https://issues.apache.org/jira/browse/BEAM-315
>
> 2016-05-31 14:51 GMT+02:00 Pawel Szczur <pawelszczur@gmail.com
> <ma...@gmail.com>>:
>
>     I've also added the test for GroupByKey. It fails. It kind of makes
>     Flink broken at the moment, isn't it?
>
>     I'm wondering.. may it be related to some Windowing issue?
>
>     2016-05-31 14:40 GMT+02:00 Pawel Szczur <pawelszczur@gmail.com
>     <ma...@gmail.com>>:
>
>         I've just tested it. It fails.
>
>         Also added the test to the repo:
>         https://github.com/orian/cogroup-wrong-grouping
>
>         I reason, this means that GroupByKey is flawed? If you open an
>         official issue, please add it to discussion.
>
>         2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <aljoscha@apache.org
>         <ma...@apache.org>>:
>
>             Does 2. work for the cases where CoGroupByKey fails? Reason
>             I'm asking is that CoGroupByKey is essentially implemented
>             like that internally: create tagged union -> flatten ->
>             GroupByKey.
>
>             On Tue, 31 May 2016 at 01:16 Pawel Szczur
>             <pawelszczur@gmail.com <ma...@gmail.com>> wrote:
>
>                 I've naively tried few other key types, it seems to be
>                 unrelated to key type.
>
>                 As for now I have two workarounds and ignorance:
>                   1. If there is one dominant dataset and other datasets
>                 are small (size << GB) then I use SideInput.
>                   2. If I have multiple datasets of similar size I
>                 enclose it in a common container, flatten it and GroupByKey.
>                   3. I measure occurrences and ignore the bug for now.
>
>                 Do you have an idea how a test for this may be
>                 constructed? It seems handy, I think.
>
>                 I also found two things, maybe they help you:
>                   1. issue doesn't appear without parallelism
>                   2. issue doesn't appear with a tiny datasets
>
>                 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek
>                 <aljoscha@apache.org <ma...@apache.org>>:
>
>                     You're right. I'm still looking into this,
>                     unfortunately I haven't made progress so far. I'll
>                     keep you posted.
>
>                     On Sun, 29 May 2016 at 18:20 Pawel Szczur
>                     <pawelszczur@gmail.com
>                     <ma...@gmail.com>> wrote:
>
>                         Hi,
>
>                         I used the config as in the repo.
>                         Please grep the the log for
>                         "hereGoesLongStringID0,2", you will see that
>                         this key is processed multiple times.
>
>                         This is how I understand CoGroupByKey: one has
>                         two (or more) PCollection<KV<K,?>>. Both sets
>                         are grouped by key. For each unique key a KV<K,
>                         CoGbkResult> is produced, a given CoGbkResult
>                         contains all values from all input PCollections
>                         which have the given key.
>
>                         But from the log it seems that each key produced
>                         more than one CoGbkResult.
>
>                         The final counters didn't catch the bug because
>                         in your case, the value from dataset1 was
>                         replicated for each key.
>
>                         Cheers, Pawel
>
>                         2016-05-29 15:59 GMT+02:00 Aljoscha Krettek
>                         <aljoscha@apache.org <ma...@apache.org>>:
>
>                             Hi,
>                             I ran your data generator with these configs:
>                             p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>                                  .apply(ParDo.of(new Generator())).apply(
>                             AvroIO.Write.to
>                             <http://AvroIO.Write.to>("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>
>                             p.apply(Create.of(new Config(3, 5, 600_000,
>                             2))).
>                                  apply(ParDo.of(new Generator())).apply(
>                             AvroIO.Write.to
>                             <http://AvroIO.Write.to>("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>
>                             Then I ran the job with parallelism=6. I
>                             couldn't reproduce the problem, this is the
>                             log file from one of several runs:
>                             https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>
>                             Could you please send me the exact config
>                             that you used. Btw, I ran it inside an IDE,
>                             do the problems also occur in the IDE for
>                             you or only when you execute on a cluster?
>
>                             Cheers,
>                             Aljoscha
>
>                             On Sun, 29 May 2016 at 01:51 Pawel Szczur
>                             <pawelszczur@gmail.com
>                             <ma...@gmail.com>> wrote:
>
>                                 Hi Aljoscha.
>
>                                 I've created a repo with fake dataset to
>                                 allow easily reproduce the problem:
>                                 https://github.com/orian/cogroup-wrong-grouping
>
>                                 What I noticed: if the dataset is too
>                                 small the bug doesn't appear.
>
>                                 You can modify the size of dataset, but
>                                 in ideal case it should be few hundred
>                                 thousands records per key (I guess it
>                                 depends on the machine you run it).
>
>                                 Cheers, Pawel
>
>                                 2016-05-28 12:45 GMT+02:00 Aljoscha
>                                 Krettek <aljoscha@apache.org
>                                 <ma...@apache.org>>:
>
>                                     Hi,
>                                     which version of Beam/Flink are you
>                                     using.
>
>                                     Could you maybe also provide example
>                                     data and code that showcases the
>                                     problem? If you have concerns about
>                                     sending it to a public list you can
>                                     also send it to me directly.
>
>                                     Cheers,
>                                     Aljoscha
>
>                                     On Fri, 27 May 2016 at 20:53 Pawel
>                                     Szczur <pawelszczur@gmail.com
>                                     <ma...@gmail.com>> wrote:
>
>                                         *Data description.*
>
>                                         I have two datasets.
>
>                                         Records - the first, containes
>                                         around 0.5-1M of records per
>                                         (key,day). For testing I use 2-3
>                                         keys and 5-10 days of data. What
>                                         I shoot for is 1000+ keys. Each
>                                         record contains key, timestamp
>                                         in \u03bc-seconds and some other data.
>                                         Configs - the second, is rather
>                                         small. It describes the key in
>                                         time, e.g. you can think about
>                                         it as a list of tuples: (key,
>                                         start date, end date, description).
>
>                                         For the exploration I've encoded
>                                         the data as files of
>                                         length-prefixed Protocol Buffer
>                                         binary encoded messages.
>                                         Additionally the files are
>                                         packed with gzip. Data is
>                                         sharded by date. Each file is
>                                         around 10MB.
>
>                                         *Pipeline*
>
>                                         First I add keys to both
>                                         datasets. For Records dataset
>                                         it's (key, day rounded
>                                         timestamp). For Configs a key is
>                                         (key, day), where day is each
>                                         timestamp value between start
>                                         date and end date (pointing
>                                         midnight).
>                                         The datasets are merged using
>                                         CoGroupByKey.
>
>                                         As a key type I use import
>                                         org.apache.flink.api.java.tuple.Tuple2
>                                         with a Tuple2Coder from this repo.
>
>                                         *The problem*
>
>                                         If the Records dataset is tiny
>                                         like 5 days, everything seems
>                                         fine (check normal_run.log).
>
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:124) -
>                                         Final aggregator values:
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         item count : 4322332
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         missing val1 : 0
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         multiple val1 : 0
>
>                                         When I run the pipeline against
>                                         10+ days I encounter an error
>                                         pointing that for some Records
>                                         there's no Config (wrong_run.log).
>
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:124) -
>                                         Final aggregator values:
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         item count : 8577197
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         missing val1 : 6
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         multiple val1 : 0
>
>                                         Then I've added some extra
>                                         logging messages:
>
>                                         (ConvertToItem.java:144) - 68643
>                                         items for KeyValue3 on:
>                                         1462665600000000
>                                         (ConvertToItem.java:140) - no
>                                         items for KeyValue3 on:
>                                         1463184000000000
>                                         (ConvertToItem.java:123) -
>                                         missing for KeyValue3 on:
>                                         1462924800000000
>                                         (ConvertToItem.java:142) -
>                                         753707 items for KeyValue3 on:
>                                         1462924800000000 marked as no-loc
>                                         (ConvertToItem.java:123) -
>                                         missing for KeyValue3 on:
>                                         1462752000000000
>                                         (ConvertToItem.java:142) -
>                                         749901 items for KeyValue3 on:
>                                         1462752000000000 marked as no-loc
>                                         (ConvertToItem.java:144) -
>                                         754578 items for KeyValue3 on:
>                                         1462406400000000
>                                         (ConvertToItem.java:144) -
>                                         751574 items for KeyValue3 on:
>                                         1463011200000000
>                                         (ConvertToItem.java:123) -
>                                         missing for KeyValue3 on:
>                                         1462665600000000
>                                         (ConvertToItem.java:142) -
>                                         754758 items for KeyValue3 on:
>                                         1462665600000000 marked as no-loc
>                                         (ConvertToItem.java:123) -
>                                         missing for KeyValue3 on:
>                                         1463184000000000
>                                         (ConvertToItem.java:142) -
>                                         694372 items for KeyValue3 on:
>                                         1463184000000000 marked as no-loc
>
>                                         You can spot that in first line
>                                         68643 items were processed for
>                                         KeyValue3 and time 1462665600000000.
>                                         Later on in line 9 it seems the
>                                         operation processes the same key
>                                         again, but it reports that no
>                                         Config was available for these
>                                         Records.
>                                         The line 10 informs they've been
>                                         marked as no-loc.
>
>                                         The line 2 is saying that there
>                                         were no items for KeyValue3 and
>                                         time 1463184000000000, but in
>                                         line 11 you can read that the
>                                         items for this (key,day) pair
>                                         were processed later and they've
>                                         lacked a Config.
>
>                                         *Work-around (after more
>                                         testing, doesn't work, staying
>                                         with Tuple2)*
>
>                                         I've switched from using Tuple2
>                                         to a Protocol Buffer message:
>
>                                         message KeyDay {
>                                            optional ByteString key = 1;
>                                            optional int64 timestamp_usec
>                                         = 2;
>                                         }
>
>                                         But using Tuple2.of() was just
>                                         easier than:
>                                         KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>
>                                         // The original description
>                                         comes from:
>                                         http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>
>
>
>
>
>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Pawel Szczur <pa...@gmail.com>.
FYI I've opened an issue: https://issues.apache.org/jira/browse/BEAM-315

2016-05-31 14:51 GMT+02:00 Pawel Szczur <pa...@gmail.com>:

> I've also added the test for GroupByKey. It fails. It kind of makes Flink
> broken at the moment, isn't it?
>
> I'm wondering.. may it be related to some Windowing issue?
>
> 2016-05-31 14:40 GMT+02:00 Pawel Szczur <pa...@gmail.com>:
>
>> I've just tested it. It fails.
>>
>> Also added the test to the repo:
>> https://github.com/orian/cogroup-wrong-grouping
>>
>> I reason, this means that GroupByKey is flawed? If you open an official
>> issue, please add it to discussion.
>>
>> 2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>
>>> Does 2. work for the cases where CoGroupByKey fails? Reason I'm asking
>>> is that CoGroupByKey is essentially implemented like that internally:
>>> create tagged union -> flatten -> GroupByKey.
>>>
>>> On Tue, 31 May 2016 at 01:16 Pawel Szczur <pa...@gmail.com> wrote:
>>>
>>>> I've naively tried few other key types, it seems to be unrelated to key
>>>> type.
>>>>
>>>> As for now I have two workarounds and ignorance:
>>>>  1. If there is one dominant dataset and other datasets are small (size
>>>> << GB) then I use SideInput.
>>>>  2. If I have multiple datasets of similar size I enclose it in a
>>>> common container, flatten it and GroupByKey.
>>>>  3. I measure occurrences and ignore the bug for now.
>>>>
>>>> Do you have an idea how a test for this may be constructed? It seems
>>>> handy, I think.
>>>>
>>>> I also found two things, maybe they help you:
>>>>  1. issue doesn't appear without parallelism
>>>>  2. issue doesn't appear with a tiny datasets
>>>>
>>>> 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>
>>>>> You're right. I'm still looking into this, unfortunately I haven't
>>>>> made progress so far. I'll keep you posted.
>>>>>
>>>>> On Sun, 29 May 2016 at 18:20 Pawel Szczur <pa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I used the config as in the repo.
>>>>>> Please grep the the log for "hereGoesLongStringID0,2", you will see
>>>>>> that this key is processed multiple times.
>>>>>>
>>>>>> This is how I understand CoGroupByKey: one has two (or more)
>>>>>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a
>>>>>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values
>>>>>> from all input PCollections which have the given key.
>>>>>>
>>>>>> But from the log it seems that each key produced more than one
>>>>>> CoGbkResult.
>>>>>>
>>>>>> The final counters didn't catch the bug because in your case, the
>>>>>> value from dataset1 was replicated for each key.
>>>>>>
>>>>>> Cheers, Pawel
>>>>>>
>>>>>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>
>>>>>>> Hi,
>>>>>>> I ran your data generator with these configs:
>>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>>>>>>>     .apply(ParDo.of(new Generator())).apply(
>>>>>>>         AvroIO.Write.to
>>>>>>> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>>>>>>>
>>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>>>>>>>     apply(ParDo.of(new Generator())).apply(
>>>>>>>         AvroIO.Write.to
>>>>>>> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>>>>>>>
>>>>>>> Then I ran the job with parallelism=6. I couldn't reproduce the
>>>>>>> problem, this is the log file from one of several runs:
>>>>>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>>>>>>>
>>>>>>> Could you please send me the exact config that you used. Btw, I ran
>>>>>>> it inside an IDE, do the problems also occur in the IDE for you or only
>>>>>>> when you execute on a cluster?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <pa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Aljoscha.
>>>>>>>>
>>>>>>>> I've created a repo with fake dataset to allow easily reproduce the
>>>>>>>> problem:
>>>>>>>> https://github.com/orian/cogroup-wrong-grouping
>>>>>>>>
>>>>>>>> What I noticed: if the dataset is too small the bug doesn't appear.
>>>>>>>>
>>>>>>>> You can modify the size of dataset, but in ideal case it should be
>>>>>>>> few hundred thousands records per key (I guess it depends on the machine
>>>>>>>> you run it).
>>>>>>>>
>>>>>>>> Cheers, Pawel
>>>>>>>>
>>>>>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> which version of Beam/Flink are you using.
>>>>>>>>>
>>>>>>>>> Could you maybe also provide example data and code that showcases
>>>>>>>>> the problem? If you have concerns about sending it to a public list you can
>>>>>>>>> also send it to me directly.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> *Data description.*
>>>>>>>>>>
>>>>>>>>>> I have two datasets.
>>>>>>>>>>
>>>>>>>>>> Records - the first, containes around 0.5-1M of records per
>>>>>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I shoot
>>>>>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds and
>>>>>>>>>> some other data.
>>>>>>>>>> Configs - the second, is rather small. It describes the key in
>>>>>>>>>> time, e.g. you can think about it as a list of tuples: (key, start date,
>>>>>>>>>> end date, description).
>>>>>>>>>>
>>>>>>>>>> For the exploration I've encoded the data as files of
>>>>>>>>>> length-prefixed Protocol Buffer binary encoded messages. Additionally the
>>>>>>>>>> files are packed with gzip. Data is sharded by date. Each file is around
>>>>>>>>>> 10MB.
>>>>>>>>>>
>>>>>>>>>> *Pipeline*
>>>>>>>>>>
>>>>>>>>>> First I add keys to both datasets. For Records dataset it's (key,
>>>>>>>>>> day rounded timestamp). For Configs a key is (key, day), where day is each
>>>>>>>>>> timestamp value between start date and end date (pointing midnight).
>>>>>>>>>> The datasets are merged using CoGroupByKey.
>>>>>>>>>>
>>>>>>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2
>>>>>>>>>> with a Tuple2Coder from this repo.
>>>>>>>>>>
>>>>>>>>>> *The problem*
>>>>>>>>>>
>>>>>>>>>> If the Records dataset is tiny like 5 days, everything seems fine
>>>>>>>>>> (check normal_run.log).
>>>>>>>>>>
>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>>>> values:
>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>>>
>>>>>>>>>> When I run the pipeline against 10+ days I encounter an error
>>>>>>>>>> pointing that for some Records there's no Config (wrong_run.log).
>>>>>>>>>>
>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>>>> values:
>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>>>
>>>>>>>>>> Then I've added some extra logging messages:
>>>>>>>>>>
>>>>>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>>>>>>>> 1462665600000000
>>>>>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on:
>>>>>>>>>> 1463184000000000
>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>> 1462924800000000
>>>>>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>>>>>>>> 1462924800000000 marked as no-loc
>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>> 1462752000000000
>>>>>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>>>>>>>> 1462752000000000 marked as no-loc
>>>>>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>>>>>>>> 1462406400000000
>>>>>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>>>>>>>> 1463011200000000
>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>> 1462665600000000
>>>>>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>>>>>>>> 1462665600000000 marked as no-loc
>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>> 1463184000000000
>>>>>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>>>>>>>> 1463184000000000 marked as no-loc
>>>>>>>>>>
>>>>>>>>>> You can spot that in first line 68643 items were processed for
>>>>>>>>>> KeyValue3 and time 1462665600000000.
>>>>>>>>>> Later on in line 9 it seems the operation processes the same key
>>>>>>>>>> again, but it reports that no Config was available for these Records.
>>>>>>>>>> The line 10 informs they've been marked as no-loc.
>>>>>>>>>>
>>>>>>>>>> The line 2 is saying that there were no items for KeyValue3 and
>>>>>>>>>> time 1463184000000000, but in line 11 you can read that the items for this
>>>>>>>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>>>>>>>
>>>>>>>>>> *Work-around (after more testing, doesn't work, staying with
>>>>>>>>>> Tuple2)*
>>>>>>>>>>
>>>>>>>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>>>>>>>
>>>>>>>>>> message KeyDay {
>>>>>>>>>>   optional ByteString key = 1;
>>>>>>>>>>   optional int64 timestamp_usec = 2;
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> But using Tuple2.of() was just easier than:
>>>>>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>>>>>>>
>>>>>>>>>> // The original description comes from:
>>>>>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>
>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Pawel Szczur <pa...@gmail.com>.
I've also added the test for GroupByKey. It fails. It kind of makes Flink
broken at the moment, isn't it?

I'm wondering.. may it be related to some Windowing issue?

2016-05-31 14:40 GMT+02:00 Pawel Szczur <pa...@gmail.com>:

> I've just tested it. It fails.
>
> Also added the test to the repo:
> https://github.com/orian/cogroup-wrong-grouping
>
> I reason, this means that GroupByKey is flawed? If you open an official
> issue, please add it to discussion.
>
> 2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>
>> Does 2. work for the cases where CoGroupByKey fails? Reason I'm asking is
>> that CoGroupByKey is essentially implemented like that internally: create
>> tagged union -> flatten -> GroupByKey.
>>
>> On Tue, 31 May 2016 at 01:16 Pawel Szczur <pa...@gmail.com> wrote:
>>
>>> I've naively tried few other key types, it seems to be unrelated to key
>>> type.
>>>
>>> As for now I have two workarounds and ignorance:
>>>  1. If there is one dominant dataset and other datasets are small (size
>>> << GB) then I use SideInput.
>>>  2. If I have multiple datasets of similar size I enclose it in a common
>>> container, flatten it and GroupByKey.
>>>  3. I measure occurrences and ignore the bug for now.
>>>
>>> Do you have an idea how a test for this may be constructed? It seems
>>> handy, I think.
>>>
>>> I also found two things, maybe they help you:
>>>  1. issue doesn't appear without parallelism
>>>  2. issue doesn't appear with a tiny datasets
>>>
>>> 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>>> You're right. I'm still looking into this, unfortunately I haven't made
>>>> progress so far. I'll keep you posted.
>>>>
>>>> On Sun, 29 May 2016 at 18:20 Pawel Szczur <pa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I used the config as in the repo.
>>>>> Please grep the the log for "hereGoesLongStringID0,2", you will see
>>>>> that this key is processed multiple times.
>>>>>
>>>>> This is how I understand CoGroupByKey: one has two (or more)
>>>>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a
>>>>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values
>>>>> from all input PCollections which have the given key.
>>>>>
>>>>> But from the log it seems that each key produced more than one
>>>>> CoGbkResult.
>>>>>
>>>>> The final counters didn't catch the bug because in your case, the
>>>>> value from dataset1 was replicated for each key.
>>>>>
>>>>> Cheers, Pawel
>>>>>
>>>>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>
>>>>>> Hi,
>>>>>> I ran your data generator with these configs:
>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>>>>>>     .apply(ParDo.of(new Generator())).apply(
>>>>>>         AvroIO.Write.to
>>>>>> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>>>>>>
>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>>>>>>     apply(ParDo.of(new Generator())).apply(
>>>>>>         AvroIO.Write.to
>>>>>> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>>>>>>
>>>>>> Then I ran the job with parallelism=6. I couldn't reproduce the
>>>>>> problem, this is the log file from one of several runs:
>>>>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>>>>>>
>>>>>> Could you please send me the exact config that you used. Btw, I ran
>>>>>> it inside an IDE, do the problems also occur in the IDE for you or only
>>>>>> when you execute on a cluster?
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <pa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Aljoscha.
>>>>>>>
>>>>>>> I've created a repo with fake dataset to allow easily reproduce the
>>>>>>> problem:
>>>>>>> https://github.com/orian/cogroup-wrong-grouping
>>>>>>>
>>>>>>> What I noticed: if the dataset is too small the bug doesn't appear.
>>>>>>>
>>>>>>> You can modify the size of dataset, but in ideal case it should be
>>>>>>> few hundred thousands records per key (I guess it depends on the machine
>>>>>>> you run it).
>>>>>>>
>>>>>>> Cheers, Pawel
>>>>>>>
>>>>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> which version of Beam/Flink are you using.
>>>>>>>>
>>>>>>>> Could you maybe also provide example data and code that showcases
>>>>>>>> the problem? If you have concerns about sending it to a public list you can
>>>>>>>> also send it to me directly.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> *Data description.*
>>>>>>>>>
>>>>>>>>> I have two datasets.
>>>>>>>>>
>>>>>>>>> Records - the first, containes around 0.5-1M of records per
>>>>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I shoot
>>>>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds and
>>>>>>>>> some other data.
>>>>>>>>> Configs - the second, is rather small. It describes the key in
>>>>>>>>> time, e.g. you can think about it as a list of tuples: (key, start date,
>>>>>>>>> end date, description).
>>>>>>>>>
>>>>>>>>> For the exploration I've encoded the data as files of
>>>>>>>>> length-prefixed Protocol Buffer binary encoded messages. Additionally the
>>>>>>>>> files are packed with gzip. Data is sharded by date. Each file is around
>>>>>>>>> 10MB.
>>>>>>>>>
>>>>>>>>> *Pipeline*
>>>>>>>>>
>>>>>>>>> First I add keys to both datasets. For Records dataset it's (key,
>>>>>>>>> day rounded timestamp). For Configs a key is (key, day), where day is each
>>>>>>>>> timestamp value between start date and end date (pointing midnight).
>>>>>>>>> The datasets are merged using CoGroupByKey.
>>>>>>>>>
>>>>>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2
>>>>>>>>> with a Tuple2Coder from this repo.
>>>>>>>>>
>>>>>>>>> *The problem*
>>>>>>>>>
>>>>>>>>> If the Records dataset is tiny like 5 days, everything seems fine
>>>>>>>>> (check normal_run.log).
>>>>>>>>>
>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>>> values:
>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>>
>>>>>>>>> When I run the pipeline against 10+ days I encounter an error
>>>>>>>>> pointing that for some Records there's no Config (wrong_run.log).
>>>>>>>>>
>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>>> values:
>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>>
>>>>>>>>> Then I've added some extra logging messages:
>>>>>>>>>
>>>>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>>>>>>> 1462665600000000
>>>>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on:
>>>>>>>>> 1463184000000000
>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>> 1462924800000000
>>>>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>>>>>>> 1462924800000000 marked as no-loc
>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>> 1462752000000000
>>>>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>>>>>>> 1462752000000000 marked as no-loc
>>>>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>>>>>>> 1462406400000000
>>>>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>>>>>>> 1463011200000000
>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>> 1462665600000000
>>>>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>>>>>>> 1462665600000000 marked as no-loc
>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>> 1463184000000000
>>>>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>>>>>>> 1463184000000000 marked as no-loc
>>>>>>>>>
>>>>>>>>> You can spot that in first line 68643 items were processed for
>>>>>>>>> KeyValue3 and time 1462665600000000.
>>>>>>>>> Later on in line 9 it seems the operation processes the same key
>>>>>>>>> again, but it reports that no Config was available for these Records.
>>>>>>>>> The line 10 informs they've been marked as no-loc.
>>>>>>>>>
>>>>>>>>> The line 2 is saying that there were no items for KeyValue3 and
>>>>>>>>> time 1463184000000000, but in line 11 you can read that the items for this
>>>>>>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>>>>>>
>>>>>>>>> *Work-around (after more testing, doesn't work, staying with
>>>>>>>>> Tuple2)*
>>>>>>>>>
>>>>>>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>>>>>>
>>>>>>>>> message KeyDay {
>>>>>>>>>   optional ByteString key = 1;
>>>>>>>>>   optional int64 timestamp_usec = 2;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> But using Tuple2.of() was just easier than:
>>>>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>>>>>>
>>>>>>>>> // The original description comes from:
>>>>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Pawel Szczur <pa...@gmail.com>.
I've just tested it. It fails.

Also added the test to the repo:
https://github.com/orian/cogroup-wrong-grouping

I reason, this means that GroupByKey is flawed? If you open an official
issue, please add it to discussion.

2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> Does 2. work for the cases where CoGroupByKey fails? Reason I'm asking is
> that CoGroupByKey is essentially implemented like that internally: create
> tagged union -> flatten -> GroupByKey.
>
> On Tue, 31 May 2016 at 01:16 Pawel Szczur <pa...@gmail.com> wrote:
>
>> I've naively tried few other key types, it seems to be unrelated to key
>> type.
>>
>> As for now I have two workarounds and ignorance:
>>  1. If there is one dominant dataset and other datasets are small (size
>> << GB) then I use SideInput.
>>  2. If I have multiple datasets of similar size I enclose it in a common
>> container, flatten it and GroupByKey.
>>  3. I measure occurrences and ignore the bug for now.
>>
>> Do you have an idea how a test for this may be constructed? It seems
>> handy, I think.
>>
>> I also found two things, maybe they help you:
>>  1. issue doesn't appear without parallelism
>>  2. issue doesn't appear with a tiny datasets
>>
>> 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>
>>> You're right. I'm still looking into this, unfortunately I haven't made
>>> progress so far. I'll keep you posted.
>>>
>>> On Sun, 29 May 2016 at 18:20 Pawel Szczur <pa...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I used the config as in the repo.
>>>> Please grep the the log for "hereGoesLongStringID0,2", you will see
>>>> that this key is processed multiple times.
>>>>
>>>> This is how I understand CoGroupByKey: one has two (or more)
>>>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a
>>>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values
>>>> from all input PCollections which have the given key.
>>>>
>>>> But from the log it seems that each key produced more than one
>>>> CoGbkResult.
>>>>
>>>> The final counters didn't catch the bug because in your case, the value
>>>> from dataset1 was replicated for each key.
>>>>
>>>> Cheers, Pawel
>>>>
>>>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>
>>>>> Hi,
>>>>> I ran your data generator with these configs:
>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>>>>>     .apply(ParDo.of(new Generator())).apply(
>>>>>         AvroIO.Write.to
>>>>> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>>>>>
>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>>>>>     apply(ParDo.of(new Generator())).apply(
>>>>>         AvroIO.Write.to
>>>>> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>>>>>
>>>>> Then I ran the job with parallelism=6. I couldn't reproduce the
>>>>> problem, this is the log file from one of several runs:
>>>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>>>>>
>>>>> Could you please send me the exact config that you used. Btw, I ran it
>>>>> inside an IDE, do the problems also occur in the IDE for you or only when
>>>>> you execute on a cluster?
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <pa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Aljoscha.
>>>>>>
>>>>>> I've created a repo with fake dataset to allow easily reproduce the
>>>>>> problem:
>>>>>> https://github.com/orian/cogroup-wrong-grouping
>>>>>>
>>>>>> What I noticed: if the dataset is too small the bug doesn't appear.
>>>>>>
>>>>>> You can modify the size of dataset, but in ideal case it should be
>>>>>> few hundred thousands records per key (I guess it depends on the machine
>>>>>> you run it).
>>>>>>
>>>>>> Cheers, Pawel
>>>>>>
>>>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>
>>>>>>> Hi,
>>>>>>> which version of Beam/Flink are you using.
>>>>>>>
>>>>>>> Could you maybe also provide example data and code that showcases
>>>>>>> the problem? If you have concerns about sending it to a public list you can
>>>>>>> also send it to me directly.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> *Data description.*
>>>>>>>>
>>>>>>>> I have two datasets.
>>>>>>>>
>>>>>>>> Records - the first, containes around 0.5-1M of records per
>>>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I shoot
>>>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds and
>>>>>>>> some other data.
>>>>>>>> Configs - the second, is rather small. It describes the key in
>>>>>>>> time, e.g. you can think about it as a list of tuples: (key, start date,
>>>>>>>> end date, description).
>>>>>>>>
>>>>>>>> For the exploration I've encoded the data as files of
>>>>>>>> length-prefixed Protocol Buffer binary encoded messages. Additionally the
>>>>>>>> files are packed with gzip. Data is sharded by date. Each file is around
>>>>>>>> 10MB.
>>>>>>>>
>>>>>>>> *Pipeline*
>>>>>>>>
>>>>>>>> First I add keys to both datasets. For Records dataset it's (key,
>>>>>>>> day rounded timestamp). For Configs a key is (key, day), where day is each
>>>>>>>> timestamp value between start date and end date (pointing midnight).
>>>>>>>> The datasets are merged using CoGroupByKey.
>>>>>>>>
>>>>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2
>>>>>>>> with a Tuple2Coder from this repo.
>>>>>>>>
>>>>>>>> *The problem*
>>>>>>>>
>>>>>>>> If the Records dataset is tiny like 5 days, everything seems fine
>>>>>>>> (check normal_run.log).
>>>>>>>>
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>> values:
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>
>>>>>>>> When I run the pipeline against 10+ days I encounter an error
>>>>>>>> pointing that for some Records there's no Config (wrong_run.log).
>>>>>>>>
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>> values:
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>
>>>>>>>> Then I've added some extra logging messages:
>>>>>>>>
>>>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>>>>>> 1462665600000000
>>>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on:
>>>>>>>> 1463184000000000
>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>> 1462924800000000
>>>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>>>>>> 1462924800000000 marked as no-loc
>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>> 1462752000000000
>>>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>>>>>> 1462752000000000 marked as no-loc
>>>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>>>>>> 1462406400000000
>>>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>>>>>> 1463011200000000
>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>> 1462665600000000
>>>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>>>>>> 1462665600000000 marked as no-loc
>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>> 1463184000000000
>>>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>>>>>> 1463184000000000 marked as no-loc
>>>>>>>>
>>>>>>>> You can spot that in first line 68643 items were processed for
>>>>>>>> KeyValue3 and time 1462665600000000.
>>>>>>>> Later on in line 9 it seems the operation processes the same key
>>>>>>>> again, but it reports that no Config was available for these Records.
>>>>>>>> The line 10 informs they've been marked as no-loc.
>>>>>>>>
>>>>>>>> The line 2 is saying that there were no items for KeyValue3 and
>>>>>>>> time 1463184000000000, but in line 11 you can read that the items for this
>>>>>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>>>>>
>>>>>>>> *Work-around (after more testing, doesn't work, staying with
>>>>>>>> Tuple2)*
>>>>>>>>
>>>>>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>>>>>
>>>>>>>> message KeyDay {
>>>>>>>>   optional ByteString key = 1;
>>>>>>>>   optional int64 timestamp_usec = 2;
>>>>>>>> }
>>>>>>>>
>>>>>>>> But using Tuple2.of() was just easier than:
>>>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>>>>>
>>>>>>>> // The original description comes from:
>>>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Aljoscha Krettek <al...@apache.org>.
Does 2. work for the cases where CoGroupByKey fails? Reason I'm asking is
that CoGroupByKey is essentially implemented like that internally: create
tagged union -> flatten -> GroupByKey.

On Tue, 31 May 2016 at 01:16 Pawel Szczur <pa...@gmail.com> wrote:

> I've naively tried few other key types, it seems to be unrelated to key
> type.
>
> As for now I have two workarounds and ignorance:
>  1. If there is one dominant dataset and other datasets are small (size <<
> GB) then I use SideInput.
>  2. If I have multiple datasets of similar size I enclose it in a common
> container, flatten it and GroupByKey.
>  3. I measure occurrences and ignore the bug for now.
>
> Do you have an idea how a test for this may be constructed? It seems
> handy, I think.
>
> I also found two things, maybe they help you:
>  1. issue doesn't appear without parallelism
>  2. issue doesn't appear with a tiny datasets
>
> 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>
>> You're right. I'm still looking into this, unfortunately I haven't made
>> progress so far. I'll keep you posted.
>>
>> On Sun, 29 May 2016 at 18:20 Pawel Szczur <pa...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I used the config as in the repo.
>>> Please grep the the log for "hereGoesLongStringID0,2", you will see
>>> that this key is processed multiple times.
>>>
>>> This is how I understand CoGroupByKey: one has two (or more)
>>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a
>>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values
>>> from all input PCollections which have the given key.
>>>
>>> But from the log it seems that each key produced more than one
>>> CoGbkResult.
>>>
>>> The final counters didn't catch the bug because in your case, the value
>>> from dataset1 was replicated for each key.
>>>
>>> Cheers, Pawel
>>>
>>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>>> Hi,
>>>> I ran your data generator with these configs:
>>>> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>>>>     .apply(ParDo.of(new Generator())).apply(
>>>>         AvroIO.Write.to
>>>> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>>>>
>>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>>>>     apply(ParDo.of(new Generator())).apply(
>>>>         AvroIO.Write.to
>>>> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>>>>
>>>> Then I ran the job with parallelism=6. I couldn't reproduce the
>>>> problem, this is the log file from one of several runs:
>>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>>>>
>>>> Could you please send me the exact config that you used. Btw, I ran it
>>>> inside an IDE, do the problems also occur in the IDE for you or only when
>>>> you execute on a cluster?
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <pa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Aljoscha.
>>>>>
>>>>> I've created a repo with fake dataset to allow easily reproduce the
>>>>> problem:
>>>>> https://github.com/orian/cogroup-wrong-grouping
>>>>>
>>>>> What I noticed: if the dataset is too small the bug doesn't appear.
>>>>>
>>>>> You can modify the size of dataset, but in ideal case it should be few
>>>>> hundred thousands records per key (I guess it depends on the machine you
>>>>> run it).
>>>>>
>>>>> Cheers, Pawel
>>>>>
>>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>
>>>>>> Hi,
>>>>>> which version of Beam/Flink are you using.
>>>>>>
>>>>>> Could you maybe also provide example data and code that showcases the
>>>>>> problem? If you have concerns about sending it to a public list you can
>>>>>> also send it to me directly.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> *Data description.*
>>>>>>>
>>>>>>> I have two datasets.
>>>>>>>
>>>>>>> Records - the first, containes around 0.5-1M of records per
>>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I shoot
>>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds and
>>>>>>> some other data.
>>>>>>> Configs - the second, is rather small. It describes the key in time,
>>>>>>> e.g. you can think about it as a list of tuples: (key, start date, end
>>>>>>> date, description).
>>>>>>>
>>>>>>> For the exploration I've encoded the data as files of
>>>>>>> length-prefixed Protocol Buffer binary encoded messages. Additionally the
>>>>>>> files are packed with gzip. Data is sharded by date. Each file is around
>>>>>>> 10MB.
>>>>>>>
>>>>>>> *Pipeline*
>>>>>>>
>>>>>>> First I add keys to both datasets. For Records dataset it's (key,
>>>>>>> day rounded timestamp). For Configs a key is (key, day), where day is each
>>>>>>> timestamp value between start date and end date (pointing midnight).
>>>>>>> The datasets are merged using CoGroupByKey.
>>>>>>>
>>>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2
>>>>>>> with a Tuple2Coder from this repo.
>>>>>>>
>>>>>>> *The problem*
>>>>>>>
>>>>>>> If the Records dataset is tiny like 5 days, everything seems fine
>>>>>>> (check normal_run.log).
>>>>>>>
>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>> values:
>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>
>>>>>>> When I run the pipeline against 10+ days I encounter an error
>>>>>>> pointing that for some Records there's no Config (wrong_run.log).
>>>>>>>
>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>> values:
>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>
>>>>>>> Then I've added some extra logging messages:
>>>>>>>
>>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>>>>> 1462665600000000
>>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on:
>>>>>>> 1463184000000000
>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
>>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>>>>> 1462924800000000 marked as no-loc
>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
>>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>>>>> 1462752000000000 marked as no-loc
>>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>>>>> 1462406400000000
>>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>>>>> 1463011200000000
>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
>>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>>>>> 1462665600000000 marked as no-loc
>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
>>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>>>>> 1463184000000000 marked as no-loc
>>>>>>>
>>>>>>> You can spot that in first line 68643 items were processed for
>>>>>>> KeyValue3 and time 1462665600000000.
>>>>>>> Later on in line 9 it seems the operation processes the same key
>>>>>>> again, but it reports that no Config was available for these Records.
>>>>>>> The line 10 informs they've been marked as no-loc.
>>>>>>>
>>>>>>> The line 2 is saying that there were no items for KeyValue3 and time
>>>>>>> 1463184000000000, but in line 11 you can read that the items for this
>>>>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>>>>
>>>>>>> *Work-around (after more testing, doesn't work, staying with Tuple2)*
>>>>>>>
>>>>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>>>>
>>>>>>> message KeyDay {
>>>>>>>   optional ByteString key = 1;
>>>>>>>   optional int64 timestamp_usec = 2;
>>>>>>> }
>>>>>>>
>>>>>>> But using Tuple2.of() was just easier than:
>>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>>>>
>>>>>>> // The original description comes from:
>>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>>>>
>>>>>>
>>>>>
>>>
>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Pawel Szczur <pa...@gmail.com>.
I've naively tried few other key types, it seems to be unrelated to key
type.

As for now I have two workarounds and ignorance:
 1. If there is one dominant dataset and other datasets are small (size <<
GB) then I use SideInput.
 2. If I have multiple datasets of similar size I enclose it in a common
container, flatten it and GroupByKey.
 3. I measure occurrences and ignore the bug for now.

Do you have an idea how a test for this may be constructed? It seems handy,
I think.

I also found two things, maybe they help you:
 1. issue doesn't appear without parallelism
 2. issue doesn't appear with a tiny datasets

2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> You're right. I'm still looking into this, unfortunately I haven't made
> progress so far. I'll keep you posted.
>
> On Sun, 29 May 2016 at 18:20 Pawel Szczur <pa...@gmail.com> wrote:
>
>> Hi,
>>
>> I used the config as in the repo.
>> Please grep the the log for "hereGoesLongStringID0,2", you will see that
>> this key is processed multiple times.
>>
>> This is how I understand CoGroupByKey: one has two (or more)
>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a
>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values
>> from all input PCollections which have the given key.
>>
>> But from the log it seems that each key produced more than one
>> CoGbkResult.
>>
>> The final counters didn't catch the bug because in your case, the value
>> from dataset1 was replicated for each key.
>>
>> Cheers, Pawel
>>
>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>
>>> Hi,
>>> I ran your data generator with these configs:
>>> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>>>     .apply(ParDo.of(new Generator())).apply(
>>>         AvroIO.Write.to
>>> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>>>
>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>>>     apply(ParDo.of(new Generator())).apply(
>>>         AvroIO.Write.to
>>> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>>>
>>> Then I ran the job with parallelism=6. I couldn't reproduce the problem,
>>> this is the log file from one of several runs:
>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>>>
>>> Could you please send me the exact config that you used. Btw, I ran it
>>> inside an IDE, do the problems also occur in the IDE for you or only when
>>> you execute on a cluster?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <pa...@gmail.com> wrote:
>>>
>>>> Hi Aljoscha.
>>>>
>>>> I've created a repo with fake dataset to allow easily reproduce the
>>>> problem:
>>>> https://github.com/orian/cogroup-wrong-grouping
>>>>
>>>> What I noticed: if the dataset is too small the bug doesn't appear.
>>>>
>>>> You can modify the size of dataset, but in ideal case it should be few
>>>> hundred thousands records per key (I guess it depends on the machine you
>>>> run it).
>>>>
>>>> Cheers, Pawel
>>>>
>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>
>>>>> Hi,
>>>>> which version of Beam/Flink are you using.
>>>>>
>>>>> Could you maybe also provide example data and code that showcases the
>>>>> problem? If you have concerns about sending it to a public list you can
>>>>> also send it to me directly.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> *Data description.*
>>>>>>
>>>>>> I have two datasets.
>>>>>>
>>>>>> Records - the first, containes around 0.5-1M of records per
>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I shoot
>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds and
>>>>>> some other data.
>>>>>> Configs - the second, is rather small. It describes the key in time,
>>>>>> e.g. you can think about it as a list of tuples: (key, start date, end
>>>>>> date, description).
>>>>>>
>>>>>> For the exploration I've encoded the data as files of length-prefixed
>>>>>> Protocol Buffer binary encoded messages. Additionally the files are packed
>>>>>> with gzip. Data is sharded by date. Each file is around 10MB.
>>>>>>
>>>>>> *Pipeline*
>>>>>>
>>>>>> First I add keys to both datasets. For Records dataset it's (key, day
>>>>>> rounded timestamp). For Configs a key is (key, day), where day is each
>>>>>> timestamp value between start date and end date (pointing midnight).
>>>>>> The datasets are merged using CoGroupByKey.
>>>>>>
>>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2
>>>>>> with a Tuple2Coder from this repo.
>>>>>>
>>>>>> *The problem*
>>>>>>
>>>>>> If the Records dataset is tiny like 5 days, everything seems fine
>>>>>> (check normal_run.log).
>>>>>>
>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>
>>>>>> When I run the pipeline against 10+ days I encounter an error
>>>>>> pointing that for some Records there's no Config (wrong_run.log).
>>>>>>
>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>
>>>>>> Then I've added some extra logging messages:
>>>>>>
>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>>>> 1462665600000000
>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000
>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>>>> 1462924800000000 marked as no-loc
>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>>>> 1462752000000000 marked as no-loc
>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>>>> 1462406400000000
>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>>>> 1463011200000000
>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>>>> 1462665600000000 marked as no-loc
>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>>>> 1463184000000000 marked as no-loc
>>>>>>
>>>>>> You can spot that in first line 68643 items were processed for
>>>>>> KeyValue3 and time 1462665600000000.
>>>>>> Later on in line 9 it seems the operation processes the same key
>>>>>> again, but it reports that no Config was available for these Records.
>>>>>> The line 10 informs they've been marked as no-loc.
>>>>>>
>>>>>> The line 2 is saying that there were no items for KeyValue3 and time
>>>>>> 1463184000000000, but in line 11 you can read that the items for this
>>>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>>>
>>>>>> *Work-around (after more testing, doesn't work, staying with Tuple2)*
>>>>>>
>>>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>>>
>>>>>> message KeyDay {
>>>>>>   optional ByteString key = 1;
>>>>>>   optional int64 timestamp_usec = 2;
>>>>>> }
>>>>>>
>>>>>> But using Tuple2.of() was just easier than:
>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>>>
>>>>>> // The original description comes from:
>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>>>
>>>>>
>>>>
>>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Aljoscha Krettek <al...@apache.org>.
You're right. I'm still looking into this, unfortunately I haven't made
progress so far. I'll keep you posted.

On Sun, 29 May 2016 at 18:20 Pawel Szczur <pa...@gmail.com> wrote:

> Hi,
>
> I used the config as in the repo.
> Please grep the the log for "hereGoesLongStringID0,2", you will see that
> this key is processed multiple times.
>
> This is how I understand CoGroupByKey: one has two (or more)
> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a
> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values
> from all input PCollections which have the given key.
>
> But from the log it seems that each key produced more than one CoGbkResult.
>
> The final counters didn't catch the bug because in your case, the value
> from dataset1 was replicated for each key.
>
> Cheers, Pawel
>
> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>
>> Hi,
>> I ran your data generator with these configs:
>> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>>     .apply(ParDo.of(new Generator())).apply(
>>         AvroIO.Write.to
>> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>>
>> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>>     apply(ParDo.of(new Generator())).apply(
>>         AvroIO.Write.to
>> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>>
>> Then I ran the job with parallelism=6. I couldn't reproduce the problem,
>> this is the log file from one of several runs:
>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>>
>> Could you please send me the exact config that you used. Btw, I ran it
>> inside an IDE, do the problems also occur in the IDE for you or only when
>> you execute on a cluster?
>>
>> Cheers,
>> Aljoscha
>>
>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <pa...@gmail.com> wrote:
>>
>>> Hi Aljoscha.
>>>
>>> I've created a repo with fake dataset to allow easily reproduce the
>>> problem:
>>> https://github.com/orian/cogroup-wrong-grouping
>>>
>>> What I noticed: if the dataset is too small the bug doesn't appear.
>>>
>>> You can modify the size of dataset, but in ideal case it should be few
>>> hundred thousands records per key (I guess it depends on the machine you
>>> run it).
>>>
>>> Cheers, Pawel
>>>
>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>>> Hi,
>>>> which version of Beam/Flink are you using.
>>>>
>>>> Could you maybe also provide example data and code that showcases the
>>>> problem? If you have concerns about sending it to a public list you can
>>>> also send it to me directly.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com>
>>>> wrote:
>>>>
>>>>> *Data description.*
>>>>>
>>>>> I have two datasets.
>>>>>
>>>>> Records - the first, containes around 0.5-1M of records per (key,day).
>>>>> For testing I use 2-3 keys and 5-10 days of data. What I shoot for is 1000+
>>>>> keys. Each record contains key, timestamp in μ-seconds and some other data.
>>>>> Configs - the second, is rather small. It describes the key in time,
>>>>> e.g. you can think about it as a list of tuples: (key, start date, end
>>>>> date, description).
>>>>>
>>>>> For the exploration I've encoded the data as files of length-prefixed
>>>>> Protocol Buffer binary encoded messages. Additionally the files are packed
>>>>> with gzip. Data is sharded by date. Each file is around 10MB.
>>>>>
>>>>> *Pipeline*
>>>>>
>>>>> First I add keys to both datasets. For Records dataset it's (key, day
>>>>> rounded timestamp). For Configs a key is (key, day), where day is each
>>>>> timestamp value between start date and end date (pointing midnight).
>>>>> The datasets are merged using CoGroupByKey.
>>>>>
>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with
>>>>> a Tuple2Coder from this repo.
>>>>>
>>>>> *The problem*
>>>>>
>>>>> If the Records dataset is tiny like 5 days, everything seems fine
>>>>> (check normal_run.log).
>>>>>
>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>
>>>>> When I run the pipeline against 10+ days I encounter an error pointing
>>>>> that for some Records there's no Config (wrong_run.log).
>>>>>
>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>
>>>>> Then I've added some extra logging messages:
>>>>>
>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>>> 1462665600000000
>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000
>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>>> 1462924800000000 marked as no-loc
>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>>> 1462752000000000 marked as no-loc
>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>>> 1462406400000000
>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>>> 1463011200000000
>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>>> 1462665600000000 marked as no-loc
>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>>> 1463184000000000 marked as no-loc
>>>>>
>>>>> You can spot that in first line 68643 items were processed for
>>>>> KeyValue3 and time 1462665600000000.
>>>>> Later on in line 9 it seems the operation processes the same key
>>>>> again, but it reports that no Config was available for these Records.
>>>>> The line 10 informs they've been marked as no-loc.
>>>>>
>>>>> The line 2 is saying that there were no items for KeyValue3 and time
>>>>> 1463184000000000, but in line 11 you can read that the items for this
>>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>>
>>>>> *Work-around (after more testing, doesn't work, staying with Tuple2)*
>>>>>
>>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>>
>>>>> message KeyDay {
>>>>>   optional ByteString key = 1;
>>>>>   optional int64 timestamp_usec = 2;
>>>>> }
>>>>>
>>>>> But using Tuple2.of() was just easier than:
>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>>
>>>>> // The original description comes from:
>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>>
>>>>
>>>
>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Pawel Szczur <pa...@gmail.com>.
Hi,

I used the config as in the repo.
Please grep the the log for "hereGoesLongStringID0,2", you will see that
this key is processed multiple times.

This is how I understand CoGroupByKey: one has two (or more)
PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a
KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values
from all input PCollections which have the given key.

But from the log it seems that each key produced more than one CoGbkResult.

The final counters didn't catch the bug because in your case, the value
from dataset1 was replicated for each key.

Cheers, Pawel

2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> Hi,
> I ran your data generator with these configs:
> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>     .apply(ParDo.of(new Generator())).apply(
>         AvroIO.Write.to
> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>
> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>     apply(ParDo.of(new Generator())).apply(
>         AvroIO.Write.to
> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>
> Then I ran the job with parallelism=6. I couldn't reproduce the problem,
> this is the log file from one of several runs:
> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>
> Could you please send me the exact config that you used. Btw, I ran it
> inside an IDE, do the problems also occur in the IDE for you or only when
> you execute on a cluster?
>
> Cheers,
> Aljoscha
>
> On Sun, 29 May 2016 at 01:51 Pawel Szczur <pa...@gmail.com> wrote:
>
>> Hi Aljoscha.
>>
>> I've created a repo with fake dataset to allow easily reproduce the
>> problem:
>> https://github.com/orian/cogroup-wrong-grouping
>>
>> What I noticed: if the dataset is too small the bug doesn't appear.
>>
>> You can modify the size of dataset, but in ideal case it should be few
>> hundred thousands records per key (I guess it depends on the machine you
>> run it).
>>
>> Cheers, Pawel
>>
>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>
>>> Hi,
>>> which version of Beam/Flink are you using.
>>>
>>> Could you maybe also provide example data and code that showcases the
>>> problem? If you have concerns about sending it to a public list you can
>>> also send it to me directly.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com> wrote:
>>>
>>>> *Data description.*
>>>>
>>>> I have two datasets.
>>>>
>>>> Records - the first, containes around 0.5-1M of records per (key,day).
>>>> For testing I use 2-3 keys and 5-10 days of data. What I shoot for is 1000+
>>>> keys. Each record contains key, timestamp in μ-seconds and some other data.
>>>> Configs - the second, is rather small. It describes the key in time,
>>>> e.g. you can think about it as a list of tuples: (key, start date, end
>>>> date, description).
>>>>
>>>> For the exploration I've encoded the data as files of length-prefixed
>>>> Protocol Buffer binary encoded messages. Additionally the files are packed
>>>> with gzip. Data is sharded by date. Each file is around 10MB.
>>>>
>>>> *Pipeline*
>>>>
>>>> First I add keys to both datasets. For Records dataset it's (key, day
>>>> rounded timestamp). For Configs a key is (key, day), where day is each
>>>> timestamp value between start date and end date (pointing midnight).
>>>> The datasets are merged using CoGroupByKey.
>>>>
>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with
>>>> a Tuple2Coder from this repo.
>>>>
>>>> *The problem*
>>>>
>>>> If the Records dataset is tiny like 5 days, everything seems fine
>>>> (check normal_run.log).
>>>>
>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>
>>>> When I run the pipeline against 10+ days I encounter an error pointing
>>>> that for some Records there's no Config (wrong_run.log).
>>>>
>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>
>>>> Then I've added some extra logging messages:
>>>>
>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>> 1462665600000000
>>>> (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000
>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>> 1462924800000000 marked as no-loc
>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>> 1462752000000000 marked as no-loc
>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>> 1462406400000000
>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>> 1463011200000000
>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>> 1462665600000000 marked as no-loc
>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>> 1463184000000000 marked as no-loc
>>>>
>>>> You can spot that in first line 68643 items were processed for
>>>> KeyValue3 and time 1462665600000000.
>>>> Later on in line 9 it seems the operation processes the same key again,
>>>> but it reports that no Config was available for these Records.
>>>> The line 10 informs they've been marked as no-loc.
>>>>
>>>> The line 2 is saying that there were no items for KeyValue3 and time
>>>> 1463184000000000, but in line 11 you can read that the items for this
>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>
>>>> *Work-around (after more testing, doesn't work, staying with Tuple2)*
>>>>
>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>
>>>> message KeyDay {
>>>>   optional ByteString key = 1;
>>>>   optional int64 timestamp_usec = 2;
>>>> }
>>>>
>>>> But using Tuple2.of() was just easier than:
>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>
>>>> // The original description comes from:
>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>
>>>
>>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I ran your data generator with these configs:
p.apply(Create.of(new Config(3, 5, 600_000, 1)))
    .apply(ParDo.of(new Generator())).apply(
        AvroIO.Write.to
("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));

p.apply(Create.of(new Config(3, 5, 600_000, 2))).
    apply(ParDo.of(new Generator())).apply(
        AvroIO.Write.to
("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));

Then I ran the job with parallelism=6. I couldn't reproduce the problem,
this is the log file from one of several runs:
https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b

Could you please send me the exact config that you used. Btw, I ran it
inside an IDE, do the problems also occur in the IDE for you or only when
you execute on a cluster?

Cheers,
Aljoscha

On Sun, 29 May 2016 at 01:51 Pawel Szczur <pa...@gmail.com> wrote:

> Hi Aljoscha.
>
> I've created a repo with fake dataset to allow easily reproduce the
> problem:
> https://github.com/orian/cogroup-wrong-grouping
>
> What I noticed: if the dataset is too small the bug doesn't appear.
>
> You can modify the size of dataset, but in ideal case it should be few
> hundred thousands records per key (I guess it depends on the machine you
> run it).
>
> Cheers, Pawel
>
> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>
>> Hi,
>> which version of Beam/Flink are you using.
>>
>> Could you maybe also provide example data and code that showcases the
>> problem? If you have concerns about sending it to a public list you can
>> also send it to me directly.
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com> wrote:
>>
>>> *Data description.*
>>>
>>> I have two datasets.
>>>
>>> Records - the first, containes around 0.5-1M of records per (key,day).
>>> For testing I use 2-3 keys and 5-10 days of data. What I shoot for is 1000+
>>> keys. Each record contains key, timestamp in μ-seconds and some other data.
>>> Configs - the second, is rather small. It describes the key in time,
>>> e.g. you can think about it as a list of tuples: (key, start date, end
>>> date, description).
>>>
>>> For the exploration I've encoded the data as files of length-prefixed
>>> Protocol Buffer binary encoded messages. Additionally the files are packed
>>> with gzip. Data is sharded by date. Each file is around 10MB.
>>>
>>> *Pipeline*
>>>
>>> First I add keys to both datasets. For Records dataset it's (key, day
>>> rounded timestamp). For Configs a key is (key, day), where day is each
>>> timestamp value between start date and end date (pointing midnight).
>>> The datasets are merged using CoGroupByKey.
>>>
>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with a
>>> Tuple2Coder from this repo.
>>>
>>> *The problem*
>>>
>>> If the Records dataset is tiny like 5 days, everything seems fine (check
>>> normal_run.log).
>>>
>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>
>>> When I run the pipeline against 10+ days I encounter an error pointing
>>> that for some Records there's no Config (wrong_run.log).
>>>
>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>
>>> Then I've added some extra logging messages:
>>>
>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on: 1462665600000000
>>> (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000
>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>> 1462924800000000 marked as no-loc
>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>> 1462752000000000 marked as no-loc
>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>> 1462406400000000
>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>> 1463011200000000
>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>> 1462665600000000 marked as no-loc
>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>> 1463184000000000 marked as no-loc
>>>
>>> You can spot that in first line 68643 items were processed for KeyValue3
>>> and time 1462665600000000.
>>> Later on in line 9 it seems the operation processes the same key again,
>>> but it reports that no Config was available for these Records.
>>> The line 10 informs they've been marked as no-loc.
>>>
>>> The line 2 is saying that there were no items for KeyValue3 and time
>>> 1463184000000000, but in line 11 you can read that the items for this
>>> (key,day) pair were processed later and they've lacked a Config.
>>>
>>> *Work-around (after more testing, doesn't work, staying with Tuple2)*
>>>
>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>
>>> message KeyDay {
>>>   optional ByteString key = 1;
>>>   optional int64 timestamp_usec = 2;
>>> }
>>>
>>> But using Tuple2.of() was just easier than:
>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>
>>> // The original description comes from:
>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>
>>
>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Pawel Szczur <pa...@gmail.com>.
Hi Aljoscha.

I've created a repo with fake dataset to allow easily reproduce the problem:
https://github.com/orian/cogroup-wrong-grouping

What I noticed: if the dataset is too small the bug doesn't appear.

You can modify the size of dataset, but in ideal case it should be few
hundred thousands records per key (I guess it depends on the machine you
run it).

Cheers, Pawel

2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> Hi,
> which version of Beam/Flink are you using.
>
> Could you maybe also provide example data and code that showcases the
> problem? If you have concerns about sending it to a public list you can
> also send it to me directly.
>
> Cheers,
> Aljoscha
>
> On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com> wrote:
>
>> *Data description.*
>>
>> I have two datasets.
>>
>> Records - the first, containes around 0.5-1M of records per (key,day).
>> For testing I use 2-3 keys and 5-10 days of data. What I shoot for is 1000+
>> keys. Each record contains key, timestamp in μ-seconds and some other data.
>> Configs - the second, is rather small. It describes the key in time, e.g.
>> you can think about it as a list of tuples: (key, start date, end date,
>> description).
>>
>> For the exploration I've encoded the data as files of length-prefixed
>> Protocol Buffer binary encoded messages. Additionally the files are packed
>> with gzip. Data is sharded by date. Each file is around 10MB.
>>
>> *Pipeline*
>>
>> First I add keys to both datasets. For Records dataset it's (key, day
>> rounded timestamp). For Configs a key is (key, day), where day is each
>> timestamp value between start date and end date (pointing midnight).
>> The datasets are merged using CoGroupByKey.
>>
>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with a
>> Tuple2Coder from this repo.
>>
>> *The problem*
>>
>> If the Records dataset is tiny like 5 days, everything seems fine (check
>> normal_run.log).
>>
>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>
>> When I run the pipeline against 10+ days I encounter an error pointing
>> that for some Records there's no Config (wrong_run.log).
>>
>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>
>> Then I've added some extra logging messages:
>>
>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on: 1462665600000000
>> (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000
>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>> 1462924800000000 marked as no-loc
>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>> 1462752000000000 marked as no-loc
>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on: 1462406400000000
>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on: 1463011200000000
>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>> 1462665600000000 marked as no-loc
>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>> 1463184000000000 marked as no-loc
>>
>> You can spot that in first line 68643 items were processed for KeyValue3
>> and time 1462665600000000.
>> Later on in line 9 it seems the operation processes the same key again,
>> but it reports that no Config was available for these Records.
>> The line 10 informs they've been marked as no-loc.
>>
>> The line 2 is saying that there were no items for KeyValue3 and time
>> 1463184000000000, but in line 11 you can read that the items for this
>> (key,day) pair were processed later and they've lacked a Config.
>>
>> *Work-around (after more testing, doesn't work, staying with Tuple2)*
>>
>> I've switched from using Tuple2 to a Protocol Buffer message:
>>
>> message KeyDay {
>>   optional ByteString key = 1;
>>   optional int64 timestamp_usec = 2;
>> }
>>
>> But using Tuple2.of() was just easier than:
>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>
>> // The original description comes from:
>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>
>

Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
which version of Beam/Flink are you using.

Could you maybe also provide example data and code that showcases the
problem? If you have concerns about sending it to a public list you can
also send it to me directly.

Cheers,
Aljoscha

On Fri, 27 May 2016 at 20:53 Pawel Szczur <pa...@gmail.com> wrote:

> *Data description.*
>
> I have two datasets.
>
> Records - the first, containes around 0.5-1M of records per (key,day). For
> testing I use 2-3 keys and 5-10 days of data. What I shoot for is 1000+
> keys. Each record contains key, timestamp in μ-seconds and some other data.
> Configs - the second, is rather small. It describes the key in time, e.g.
> you can think about it as a list of tuples: (key, start date, end date,
> description).
>
> For the exploration I've encoded the data as files of length-prefixed
> Protocol Buffer binary encoded messages. Additionally the files are packed
> with gzip. Data is sharded by date. Each file is around 10MB.
>
> *Pipeline*
>
> First I add keys to both datasets. For Records dataset it's (key, day
> rounded timestamp). For Configs a key is (key, day), where day is each
> timestamp value between start date and end date (pointing midnight).
> The datasets are merged using CoGroupByKey.
>
> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with a
> Tuple2Coder from this repo.
>
> *The problem*
>
> If the Records dataset is tiny like 5 days, everything seems fine (check
> normal_run.log).
>
>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>
> When I run the pipeline against 10+ days I encounter an error pointing
> that for some Records there's no Config (wrong_run.log).
>
>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>
> Then I've added some extra logging messages:
>
> (ConvertToItem.java:144) - 68643 items for KeyValue3 on: 1462665600000000
> (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000
> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
> (ConvertToItem.java:142) - 753707 items for KeyValue3 on: 1462924800000000
> marked as no-loc
> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
> (ConvertToItem.java:142) - 749901 items for KeyValue3 on: 1462752000000000
> marked as no-loc
> (ConvertToItem.java:144) - 754578 items for KeyValue3 on: 1462406400000000
> (ConvertToItem.java:144) - 751574 items for KeyValue3 on: 1463011200000000
> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
> (ConvertToItem.java:142) - 754758 items for KeyValue3 on: 1462665600000000
> marked as no-loc
> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
> (ConvertToItem.java:142) - 694372 items for KeyValue3 on: 1463184000000000
> marked as no-loc
>
> You can spot that in first line 68643 items were processed for KeyValue3
> and time 1462665600000000.
> Later on in line 9 it seems the operation processes the same key again,
> but it reports that no Config was available for these Records.
> The line 10 informs they've been marked as no-loc.
>
> The line 2 is saying that there were no items for KeyValue3 and time
> 1463184000000000, but in line 11 you can read that the items for this
> (key,day) pair were processed later and they've lacked a Config.
>
> *Work-around (after more testing, doesn't work, staying with Tuple2)*
>
> I've switched from using Tuple2 to a Protocol Buffer message:
>
> message KeyDay {
>   optional ByteString key = 1;
>   optional int64 timestamp_usec = 2;
> }
>
> But using Tuple2.of() was just easier than:
> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>
> // The original description comes from:
> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>