You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Will Bastian <wi...@gmail.com> on 2018/08/09 16:12:41 UTC

Dataset.distinct - Question on deterministic results

I'm operating on a data set with some challenges to overcome. They are:

   1. There is possibility for multiple entries for a single key
   and
   2. For a single key, there may be multiple unique value-tuples

For example
key, val1, val2, val3
1,      0,    0,    0
1,      0,    0,    0
1,      1,    0,    0
2,      1,    1,    1
2,      1,    1,    1
2,      1,    1,    0
1,      0,    0,    0

I've found when executing mySet.distinct(_.key) on the above, that my final
results suggest distinct isn't always pulling the same record/value-tuple
on every run.

Fully understanding that the use of distinct I've outlined above isn't
optimal (we don't know, or care which value-tuple we get, we just want it
to be consistent on each run), I wanted to validate whether what I believe
I'm observing is accurate. Specifically, in this example is Flink reducing
by key with no concern for value, and we can expect the possibility that we
may pull different instances back on each distinct call?

Thanks,
Will

Re: Dataset.distinct - Question on deterministic results

Posted by Will Bastian <wi...@gmail.com>.
Fabian,
Thanks for the clear response. You addressed my question, and the
suggestions provide clear context on how to address.

Best,
Will


On Fri, Aug 10, 2018 at 5:52 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Will,
>
> The distinct operator is implemented as a groupBy(distinctKeys) and a
> ReduceFunction that returns the first argument.
> Hence, it depends on the order in which the records are processed by the
> ReduceFunction.
>
> Flink does not maintain a deterministic order because it is quite
> expensive in distributed systems.
> There are a few aspects that result in random order:
> - lazy split assignment
> - combiners (which are automatically added for ReduceFunctions)
> - network shuffles
>
> There are two ways to address this issue:
> 1) Fully sort the input of the combiners and reducers on all attributes.
> 2) Use a custom ReduceFunction that compares both input records on all
> (non-distinct-key) fields to determine which record to return.
>
> I would go for the second approach because it is more efficient (no need
> to fully sort before the combiner).
>
> Best, Fabian
>
> 2018-08-09 18:12 GMT+02:00 Will Bastian <wi...@gmail.com>:
>
>> I'm operating on a data set with some challenges to overcome. They are:
>>
>>    1. There is possibility for multiple entries for a single key
>>    and
>>    2. For a single key, there may be multiple unique value-tuples
>>
>> For example
>> key, val1, val2, val3
>> 1,      0,    0,    0
>> 1,      0,    0,    0
>> 1,      1,    0,    0
>> 2,      1,    1,    1
>> 2,      1,    1,    1
>> 2,      1,    1,    0
>> 1,      0,    0,    0
>>
>> I've found when executing mySet.distinct(_.key) on the above, that my
>> final results suggest distinct isn't always pulling the same
>> record/value-tuple on every run.
>>
>> Fully understanding that the use of distinct I've outlined above isn't
>> optimal (we don't know, or care which value-tuple we get, we just want it
>> to be consistent on each run), I wanted to validate whether what I believe
>> I'm observing is accurate. Specifically, in this example is Flink reducing
>> by key with no concern for value, and we can expect the possibility that we
>> may pull different instances back on each distinct call?
>>
>> Thanks,
>> Will
>>
>
>

Re: Dataset.distinct - Question on deterministic results

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Will,

The distinct operator is implemented as a groupBy(distinctKeys) and a
ReduceFunction that returns the first argument.
Hence, it depends on the order in which the records are processed by the
ReduceFunction.

Flink does not maintain a deterministic order because it is quite expensive
in distributed systems.
There are a few aspects that result in random order:
- lazy split assignment
- combiners (which are automatically added for ReduceFunctions)
- network shuffles

There are two ways to address this issue:
1) Fully sort the input of the combiners and reducers on all attributes.
2) Use a custom ReduceFunction that compares both input records on all
(non-distinct-key) fields to determine which record to return.

I would go for the second approach because it is more efficient (no need to
fully sort before the combiner).

Best, Fabian

2018-08-09 18:12 GMT+02:00 Will Bastian <wi...@gmail.com>:

> I'm operating on a data set with some challenges to overcome. They are:
>
>    1. There is possibility for multiple entries for a single key
>    and
>    2. For a single key, there may be multiple unique value-tuples
>
> For example
> key, val1, val2, val3
> 1,      0,    0,    0
> 1,      0,    0,    0
> 1,      1,    0,    0
> 2,      1,    1,    1
> 2,      1,    1,    1
> 2,      1,    1,    0
> 1,      0,    0,    0
>
> I've found when executing mySet.distinct(_.key) on the above, that my
> final results suggest distinct isn't always pulling the same
> record/value-tuple on every run.
>
> Fully understanding that the use of distinct I've outlined above isn't
> optimal (we don't know, or care which value-tuple we get, we just want it
> to be consistent on each run), I wanted to validate whether what I believe
> I'm observing is accurate. Specifically, in this example is Flink reducing
> by key with no concern for value, and we can expect the possibility that we
> may pull different instances back on each distinct call?
>
> Thanks,
> Will
>