You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fridtjof Sander <fs...@mailbox.tu-berlin.de> on 2016/02/01 11:32:46 UTC

join with no element appearing in multiple join-pairs

Hi,

I have a problem which seems to be unsolvable in Flink at the moment (1.0-Snapshot, current master branch)
and I would kindly ask for some input, ideas on alternative approaches or just a confirmatory "yup, that doesn't work".

### Here's the situation:

I have a dataset and its elements are totally ascending sorted by some 
key (Int). Each element has a "next-pointer" to its successor, which is 
just another field with the key of the following element: x0 -> x1 -> x2 
-> x3 -> ... -> xn The keys are not necessarily increasing by 1, so it 
may be that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so 
on. I need to process that set in the following way: iterate: find all 
pairs of elements where "next == key" BUT make sure no element appears 
in multiple pairs example: do pair (x0, x1), (x2, x3), (x4, x5), ... but 
don't pair (x1, x2), (x3, x4), ... then, if some condition is met, 
combine a pair run above procedure again with switched 
pairing-condition: example: do pair (x1, x2), (x3, x4), (x5, x6), ... do 
not pair (x0, x1), (x2, x3), .. I hope the problem is clear... ### Now 
my approach: pseudo-scala-code:

val indexed = input.zipWithIndex val flagged = indexed.map((i, el) => 
el.setFlag(i % 2 == 0)) val left = flagged.filter(el => el.flag)
val right = flagged.filter(el => !el.flag) left.fullOuterJoin(right) 
.where(el.next) .equalTo(el.key) ... I attach my elements with a 
temporary key, that is increasing by 1, with zipWithIndex. Then, I map 
that tempKey to a boolean joinFlag: true if key is even, false if key is 
odd. Then I filter all elements with true, and put them in a dataset 
that is the left side of the next == key join. The right side are all 
elements with flag == false In the second run, I switch the flag 
construction to el.setFlag(i % 2 != 0). That actually works, there is 
only one problem: ### The problem: In my approach, I must not loose the 
total ordering of the data, because only if that ordering is preserved, 
the assignment of alternating join-flags works. Initially it is done by 
range-partitioning and partition-sorting. However, that ordering is 
destroyed, when data is shuffled for the join. And I can not restore it, 
because I have to run the whole thing in an iteration, and 
range-partitioning is not supported within iterations. ### Help? It 
sounds all very complicated, but the only thing I really have to solve 
is that join without any element appearing in multiple pairs (as 
described in "the situation"). If anyone has any idea how to solve this, 
that person would make my day so hard... Anyways, thanks for your time! 
Best, Fridtjof


Re: join with no element appearing in multiple join-pairs

Posted by Fridtjof Sander <fs...@mailbox.tu-berlin.de>.
I still have them.

To be clear: I do need to compare each element with its successor, I 
just can't have one element to be paired multiple times in the same 
step. That's why I divide the join into two steps:

data: x0 -> x1 -> x2

The first join will only pair (x0, x1). These may or may not be combined 
to one element. However, x2 will be in the result-set (of the first 
join) anyway: Elements will only be removed if they are paired with 
others and combined with them.

If x0 and x1 are combined to x12, then the second join will pair (x12, 
x2). If not, it will pair (x1, x2) and x0 will be in the result-set (of 
the second join).

This two-step join happens inside of an iteration.

If I wouldn't have this two-step join, I would pair (x0, x1) and (x1, 
x2). In the event that both are combined, x1 is used twice to build a 
new element, and that must not happen.


Am 1. Februar 2016 13:00:05 MEZ, schrieb Till Rohrmann 
<tr...@apache.org>:

    In the described case, can it be that you still have elements with
    `id % 2 == 1` in your data set or are they filtered out? If they are
    filtered out, then you can simply shift the indices for each
    iteration to the right.

    On Mon, Feb 1, 2016 at 12:32 PM, Fridtjof Sander
    <fsander@mailbox.tu-berlin.de <ma...@mailbox.tu-berlin.de>>
    wrote:

        Hi Till,

        thanks for your reply!

        The problem with that is, that I sometimes combine two elements:

        So from x0 -> x1 -> x2 I join (x0, x1) which might become x0 ->
        x2 in the end.

        The indices from zipWithIndex then are 0 and 2, resulting in
        equal joins flags. Sequential elements always have to have
        alternating flags, which gets violated here.

        Best
        Fridtjof

        Am 01.02.16 um 12:26 schrieb Till Rohrmann:
>
>         Hi Fridtjof,
>
>         I might miss something, but can’t you assign the ids once
>         before starting the iteration and then reuse them throughout
>         the iterations? Of course you would have to add another field
>         to your input data but then you don’t have to run the
>         |zipWithIndex| for every iteration.
>
>         Cheers,
>         Till
>
>         ​
>
>         On Mon, Feb 1, 2016 at 11:37 AM, Fridtjof Sander
>         <fsander@mailbox.tu-berlin.de
>         <ma...@mailbox.tu-berlin.de>> wrote:
>
>             (tried to reformat)
>
>
>             Hi,
>
>             I have a problem which seems to be unsolvable in Flink at
>             the moment (1.0-Snapshot, current master branch)
>             and I would kindly ask for some input, ideas on
>             alternative approaches or just a confirmatory "yup, that
>             doesn't work".
>
>             ### Here's the situation:
>
>             I have a dataset and its elements are totally ascending
>             sorted by some key (Int). Each element has a
>             "next-pointer" to its successor, which is just another
>             field with the key of the following element:
>
>             x0 -> x1 -> x2 -> x3 -> ... -> xn
>
>             The keys are not necessarily increasing by 1, so it may be
>             that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25
>             and so on. I need to process that set in the following way:
>
>             iterate:
>
>             find all pairs of elements where "next == key" BUT make
>             sure no element appears in multiple pairs
>
>             example: do pair (x0, x1), (x2, x3), (x4, x5), ... but
>             don't pair (x1, x2), (x3, x4), ...
>
>             then, if some condition is met, combine a pair
>
>             run above procedure again with switched pairing-condition:
>
>             example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not
>             pair (x0, x1), (x2, x3), ..
>
>             I hope the problem is clear...
>
>
>             ### Now my approach: pseudo-scala-code:
>
>
>             val indexed = input.zipWithIndex
>
>             val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))
>
>             val left = flagged.filter(el => el.flag)
>
>             val right = flagged.filter(el => !el.flag)
>
>             left.fullOuterJoin(right)
>
>              .where(el.next)
>
>              .equalTo(el.key)
>
>              ...
>
>
>             I attach my elements with a temporary key, that is
>             increasing by 1, with zipWithIndex. Then, I map that
>             tempKey to a boolean joinFlag: true if key is even, false
>             if key is odd. Then I filter all elements with true, and
>             put them in a dataset that is the left side of the next ==
>             key join. The right side are all elements with flag ==
>             false In the second run, I switch the flag construction to
>             el.setFlag(i % 2 != 0).
>
>             That actually works, there is only one problem:
>
>
>             ### The problem:
>
>
>             In my approach, I must not loose the total ordering of the
>             data, because only if that ordering is preserved, the
>             assignment of alternating join-flags works. Initially it
>             is done by range-partitioning and partition-sorting.
>             However, that ordering is destroyed, when data is shuffled
>             for the join. And I can not restore it, because I have to
>             run the whole thing in an iteration, and
>             range-partitioning is not supported within iterations.
>
>
>             ### Help?
>
>             It sounds all very complicated, but the only thing I
>             really have to solve is that join without any element
>             appearing in multiple pairs (as described in "the
>             situation"). If anyone has any idea how to solve this,
>             that person would make my day so hard...
>
>             Anyways, thanks for your time!
>
>             Best, Fridtjof
>
>
>
>             Am 01.02.16 um 11:32 schrieb Fridtjof Sander:
>>             Hi,
>>
>>             I have a problem which seems to be unsolvable in Flink at
>>             the moment (1.0-Snapshot, current master branch)
>>             and I would kindly ask for some input, ideas on
>>             alternative approaches or just a confirmatory "yup, that
>>             doesn't work".
>>
>>             ### Here's the situation:
>>
>>             I have a dataset and its elements are totally ascending
>>             sorted by some key (Int). Each element has a
>>             "next-pointer" to its successor, which is just another
>>             field with the key of the following element: x0 -> x1 ->
>>             x2 -> x3 -> ... -> xn The keys are not necessarily
>>             increasing by 1, so it may be that: x0 has key 2 and x1
>>             has key 10, x2 has 11, x3 has 25 and so on. I need to
>>             process that set in the following way: iterate: find all
>>             pairs of elements where "next == key" BUT make sure no
>>             element appears in multiple pairs example: do pair (x0,
>>             x1), (x2, x3), (x4, x5), ... but don't pair (x1, x2),
>>             (x3, x4), ... then, if some condition is met, combine a
>>             pair run above procedure again with switched
>>             pairing-condition: example: do pair (x1, x2), (x3, x4),
>>             (x5, x6), ... do not pair (x0, x1), (x2, x3), .. I hope
>>             the problem is clear... ### Now my approach:
>>             pseudo-scala-code:
>>
>>             val indexed = input.zipWithIndex val flagged =
>>             indexed.map((i, el) => el.setFlag(i % 2 == 0)) val left =
>>             flagged.filter(el => el.flag)
>>             val right = flagged.filter(el => !el.flag)
>>             left.fullOuterJoin(right) .where(el.next)
>>             .equalTo(el.key) ... I attach my elements with a
>>             temporary key, that is increasing by 1, with
>>             zipWithIndex. Then, I map that tempKey to a boolean
>>             joinFlag: true if key is even, false if key is odd. Then
>>             I filter all elements with true, and put them in a
>>             dataset that is the left side of the next == key join.
>>             The right side are all elements with flag == false In the
>>             second run, I switch the flag construction to
>>             el.setFlag(i % 2 != 0). That actually works, there is
>>             only one problem: ### The problem: In my approach, I must
>>             not loose the total ordering of the data, because only if
>>             that ordering is preserved, the assignment of alternating
>>             join-flags works. Initially it is done by
>>             range-partitioning and partition-sorting. However, that
>>             ordering is destroyed, when data is shuffled for the
>>             join. And I can not restore it, because I have to run the
>>             whole thing in an iteration, and range-partitioning is
>>             not supported within iterations. ### Help? It sounds all
>>             very complicated, but the only thing I really have to
>>             solve is that join without any element appearing in
>>             multiple pairs (as described in "the situation"). If
>>             anyone has any idea how to solve this, that person would
>>             make my day so hard... Anyways, thanks for your time!
>>             Best, Fridtjof
>>
>
>



Re: join with no element appearing in multiple join-pairs

Posted by Till Rohrmann <tr...@apache.org>.
In the described case, can it be that you still have elements with `id % 2
== 1` in your data set or are they filtered out? If they are filtered out,
then you can simply shift the indices for each iteration to the right.

On Mon, Feb 1, 2016 at 12:32 PM, Fridtjof Sander <
fsander@mailbox.tu-berlin.de> wrote:

> Hi Till,
>
> thanks for your reply!
>
> The problem with that is, that I sometimes combine two elements:
>
> So from x0 -> x1 -> x2 I join (x0, x1) which might become x0 -> x2 in the
> end.
>
> The indices from zipWithIndex then are 0 and 2, resulting in equal joins
> flags. Sequential elements always have to have alternating flags, which
> gets violated here.
>
> Best
> Fridtjof
>
> Am 01.02.16 um 12:26 schrieb Till Rohrmann:
>
> Hi Fridtjof,
>
> I might miss something, but can’t you assign the ids once before starting
> the iteration and then reuse them throughout the iterations? Of course you
> would have to add another field to your input data but then you don’t have
> to run the zipWithIndex for every iteration.
>
> Cheers,
> Till
> ​
>
> On Mon, Feb 1, 2016 at 11:37 AM, Fridtjof Sander <
> fsander@mailbox.tu-berlin.de> wrote:
>
>> (tried to reformat)
>>
>>
>> Hi,
>>
>> I have a problem which seems to be unsolvable in Flink at the moment
>> (1.0-Snapshot, current master branch)
>> and I would kindly ask for some input, ideas on alternative approaches or
>> just a confirmatory "yup, that doesn't work".
>>
>> ### Here's the situation:
>>
>> I have a dataset and its elements are totally ascending sorted by some
>> key (Int). Each element has a "next-pointer" to its successor, which is
>> just another field with the key of the following element:
>>
>> x0 -> x1 -> x2 -> x3 -> ... -> xn
>>
>> The keys are not necessarily increasing by 1, so it may be that: x0 has
>> key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I need to process
>> that set in the following way:
>>
>> iterate:
>>
>> find all pairs of elements where "next == key" BUT make sure no element
>> appears in multiple pairs
>>
>> example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair (x1,
>> x2), (x3, x4), ...
>>
>> then, if some condition is met, combine a pair
>>
>> run above procedure again with switched pairing-condition:
>>
>> example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not pair (x0, x1),
>> (x2, x3), ..
>>
>> I hope the problem is clear...
>>
>>
>> ### Now my approach: pseudo-scala-code:
>>
>>
>> val indexed = input.zipWithIndex
>>
>> val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))
>>
>> val left = flagged.filter(el => el.flag)
>>
>> val right = flagged.filter(el => !el.flag)
>>
>> left.fullOuterJoin(right)
>>
>>  .where(el.next)
>>
>>  .equalTo(el.key)
>>
>>  ...
>>
>>
>> I attach my elements with a temporary key, that is increasing by 1, with
>> zipWithIndex. Then, I map that tempKey to a boolean joinFlag: true if key
>> is even, false if key is odd. Then I filter all elements with true, and put
>> them in a dataset that is the left side of the next == key join. The right
>> side are all elements with flag == false In the second run, I switch the
>> flag construction to el.setFlag(i % 2 != 0).
>>
>> That actually works, there is only one problem:
>>
>>
>> ### The problem:
>>
>>
>> In my approach, I must not loose the total ordering of the data, because
>> only if that ordering is preserved, the assignment of alternating
>> join-flags works. Initially it is done by range-partitioning and
>> partition-sorting. However, that ordering is destroyed, when data is
>> shuffled for the join. And I can not restore it, because I have to run the
>> whole thing in an iteration, and range-partitioning is not supported within
>> iterations.
>>
>>
>> ### Help?
>>
>> It sounds all very complicated, but the only thing I really have to solve
>> is that join without any element appearing in multiple pairs (as described
>> in "the situation"). If anyone has any idea how to solve this, that person
>> would make my day so hard...
>>
>> Anyways, thanks for your time!
>>
>> Best, Fridtjof
>>
>>
>>
>> Am 01.02.16 um 11:32 schrieb Fridtjof Sander:
>>
>> Hi,
>>
>> I have a problem which seems to be unsolvable in Flink at the moment
>> (1.0-Snapshot, current master branch)
>> and I would kindly ask for some input, ideas on alternative approaches or
>> just a confirmatory "yup, that doesn't work".
>>
>> ### Here's the situation:
>>
>> I have a dataset and its elements are totally ascending sorted by some
>> key (Int). Each element has a "next-pointer" to its successor, which is
>> just another field with the key of the following element: x0 -> x1 -> x2 ->
>> x3 -> ... -> xn The keys are not necessarily increasing by 1, so it may be
>> that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I
>> need to process that set in the following way: iterate: find all pairs of
>> elements where "next == key" BUT make sure no element appears in multiple
>> pairs example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair
>> (x1, x2), (x3, x4), ... then, if some condition is met, combine a pair run
>> above procedure again with switched pairing-condition: example: do pair
>> (x1, x2), (x3, x4), (x5, x6), ... do not pair (x0, x1), (x2, x3), .. I hope
>> the problem is clear... ### Now my approach: pseudo-scala-code:
>>
>> val indexed = input.zipWithIndex val flagged = indexed.map((i, el) =>
>> el.setFlag(i % 2 == 0)) val left = flagged.filter(el => el.flag)
>> val right = flagged.filter(el => !el.flag) left.fullOuterJoin(right)
>> .where(el.next) .equalTo(el.key) ... I attach my elements with a temporary
>> key, that is increasing by 1, with zipWithIndex. Then, I map that tempKey
>> to a boolean joinFlag: true if key is even, false if key is odd. Then I
>> filter all elements with true, and put them in a dataset that is the left
>> side of the next == key join. The right side are all elements with flag ==
>> false In the second run, I switch the flag construction to el.setFlag(i % 2
>> != 0). That actually works, there is only one problem: ### The problem: In
>> my approach, I must not loose the total ordering of the data, because only
>> if that ordering is preserved, the assignment of alternating join-flags
>> works. Initially it is done by range-partitioning and partition-sorting.
>> However, that ordering is destroyed, when data is shuffled for the join.
>> And I can not restore it, because I have to run the whole thing in an
>> iteration, and range-partitioning is not supported within iterations. ###
>> Help? It sounds all very complicated, but the only thing I really have to
>> solve is that join without any element appearing in multiple pairs (as
>> described in "the situation"). If anyone has any idea how to solve this,
>> that person would make my day so hard... Anyways, thanks for your time!
>> Best, Fridtjof
>>
>>
>>
>
>

Re: join with no element appearing in multiple join-pairs

Posted by Fridtjof Sander <fs...@mailbox.tu-berlin.de>.
Hi Till,

thanks for your reply!

The problem with that is, that I sometimes combine two elements:

So from x0 -> x1 -> x2 I join (x0, x1) which might become x0 -> x2 in 
the end.

The indices from zipWithIndex then are 0 and 2, resulting in equal joins 
flags. Sequential elements always have to have alternating flags, which 
gets violated here.

Best
Fridtjof

Am 01.02.16 um 12:26 schrieb Till Rohrmann:
>
> Hi Fridtjof,
>
> I might miss something, but can’t you assign the ids once before 
> starting the iteration and then reuse them throughout the iterations? 
> Of course you would have to add another field to your input data but 
> then you don’t have to run the |zipWithIndex| for every iteration.
>
> Cheers,
> Till
>
> ​
>
> On Mon, Feb 1, 2016 at 11:37 AM, Fridtjof Sander 
> <fsander@mailbox.tu-berlin.de <ma...@mailbox.tu-berlin.de>> 
> wrote:
>
>     (tried to reformat)
>
>
>     Hi,
>
>     I have a problem which seems to be unsolvable in Flink at the
>     moment (1.0-Snapshot, current master branch)
>     and I would kindly ask for some input, ideas on alternative
>     approaches or just a confirmatory "yup, that doesn't work".
>
>     ### Here's the situation:
>
>     I have a dataset and its elements are totally ascending sorted by
>     some key (Int). Each element has a "next-pointer" to its
>     successor, which is just another field with the key of the
>     following element:
>
>     x0 -> x1 -> x2 -> x3 -> ... -> xn
>
>     The keys are not necessarily increasing by 1, so it may be that:
>     x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I
>     need to process that set in the following way:
>
>     iterate:
>
>     find all pairs of elements where "next == key" BUT make sure no
>     element appears in multiple pairs
>
>     example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair
>     (x1, x2), (x3, x4), ...
>
>     then, if some condition is met, combine a pair
>
>     run above procedure again with switched pairing-condition:
>
>     example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not pair
>     (x0, x1), (x2, x3), ..
>
>     I hope the problem is clear...
>
>
>     ### Now my approach: pseudo-scala-code:
>
>
>     val indexed = input.zipWithIndex
>
>     val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))
>
>     val left = flagged.filter(el => el.flag)
>
>     val right = flagged.filter(el => !el.flag)
>
>     left.fullOuterJoin(right)
>
>      .where(el.next)
>
>      .equalTo(el.key)
>
>      ...
>
>
>     I attach my elements with a temporary key, that is increasing by
>     1, with zipWithIndex. Then, I map that tempKey to a boolean
>     joinFlag: true if key is even, false if key is odd. Then I filter
>     all elements with true, and put them in a dataset that is the left
>     side of the next == key join. The right side are all elements with
>     flag == false In the second run, I switch the flag construction to
>     el.setFlag(i % 2 != 0).
>
>     That actually works, there is only one problem:
>
>
>     ### The problem:
>
>
>     In my approach, I must not loose the total ordering of the data,
>     because only if that ordering is preserved, the assignment of
>     alternating join-flags works. Initially it is done by
>     range-partitioning and partition-sorting. However, that ordering
>     is destroyed, when data is shuffled for the join. And I can not
>     restore it, because I have to run the whole thing in an iteration,
>     and range-partitioning is not supported within iterations.
>
>
>     ### Help?
>
>     It sounds all very complicated, but the only thing I really have
>     to solve is that join without any element appearing in multiple
>     pairs (as described in "the situation"). If anyone has any idea
>     how to solve this, that person would make my day so hard...
>
>     Anyways, thanks for your time!
>
>     Best, Fridtjof
>
>
>
>     Am 01.02.16 um 11:32 schrieb Fridtjof Sander:
>>     Hi,
>>
>>     I have a problem which seems to be unsolvable in Flink at the
>>     moment (1.0-Snapshot, current master branch)
>>     and I would kindly ask for some input, ideas on alternative
>>     approaches or just a confirmatory "yup, that doesn't work".
>>
>>     ### Here's the situation:
>>
>>     I have a dataset and its elements are totally ascending sorted by
>>     some key (Int). Each element has a "next-pointer" to its
>>     successor, which is just another field with the key of the
>>     following element: x0 -> x1 -> x2 -> x3 -> ... -> xn The keys are
>>     not necessarily increasing by 1, so it may be that: x0 has key 2
>>     and x1 has key 10, x2 has 11, x3 has 25 and so on. I need to
>>     process that set in the following way: iterate: find all pairs of
>>     elements where "next == key" BUT make sure no element appears in
>>     multiple pairs example: do pair (x0, x1), (x2, x3), (x4, x5), ...
>>     but don't pair (x1, x2), (x3, x4), ... then, if some condition is
>>     met, combine a pair run above procedure again with switched
>>     pairing-condition: example: do pair (x1, x2), (x3, x4), (x5, x6),
>>     ... do not pair (x0, x1), (x2, x3), .. I hope the problem is
>>     clear... ### Now my approach: pseudo-scala-code:
>>
>>     val indexed = input.zipWithIndex val flagged = indexed.map((i,
>>     el) => el.setFlag(i % 2 == 0)) val left = flagged.filter(el =>
>>     el.flag)
>>     val right = flagged.filter(el => !el.flag)
>>     left.fullOuterJoin(right) .where(el.next) .equalTo(el.key) ... I
>>     attach my elements with a temporary key, that is increasing by 1,
>>     with zipWithIndex. Then, I map that tempKey to a boolean
>>     joinFlag: true if key is even, false if key is odd. Then I filter
>>     all elements with true, and put them in a dataset that is the
>>     left side of the next == key join. The right side are all
>>     elements with flag == false In the second run, I switch the flag
>>     construction to el.setFlag(i % 2 != 0). That actually works,
>>     there is only one problem: ### The problem: In my approach, I
>>     must not loose the total ordering of the data, because only if
>>     that ordering is preserved, the assignment of alternating
>>     join-flags works. Initially it is done by range-partitioning and
>>     partition-sorting. However, that ordering is destroyed, when data
>>     is shuffled for the join. And I can not restore it, because I
>>     have to run the whole thing in an iteration, and
>>     range-partitioning is not supported within iterations. ### Help?
>>     It sounds all very complicated, but the only thing I really have
>>     to solve is that join without any element appearing in multiple
>>     pairs (as described in "the situation"). If anyone has any idea
>>     how to solve this, that person would make my day so hard...
>>     Anyways, thanks for your time! Best, Fridtjof
>>
>
>


Re: join with no element appearing in multiple join-pairs

Posted by Till Rohrmann <tr...@apache.org>.
Hi Fridtjof,

I might miss something, but can’t you assign the ids once before starting
the iteration and then reuse them throughout the iterations? Of course you
would have to add another field to your input data but then you don’t have
to run the zipWithIndex for every iteration.

Cheers,
Till
​

On Mon, Feb 1, 2016 at 11:37 AM, Fridtjof Sander <
fsander@mailbox.tu-berlin.de> wrote:

> (tried to reformat)
>
>
> Hi,
>
> I have a problem which seems to be unsolvable in Flink at the moment
> (1.0-Snapshot, current master branch)
> and I would kindly ask for some input, ideas on alternative approaches or
> just a confirmatory "yup, that doesn't work".
>
> ### Here's the situation:
>
> I have a dataset and its elements are totally ascending sorted by some key
> (Int). Each element has a "next-pointer" to its successor, which is just
> another field with the key of the following element:
>
> x0 -> x1 -> x2 -> x3 -> ... -> xn
>
> The keys are not necessarily increasing by 1, so it may be that: x0 has
> key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I need to process
> that set in the following way:
>
> iterate:
>
> find all pairs of elements where "next == key" BUT make sure no element
> appears in multiple pairs
>
> example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair (x1,
> x2), (x3, x4), ...
>
> then, if some condition is met, combine a pair
>
> run above procedure again with switched pairing-condition:
>
> example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not pair (x0, x1),
> (x2, x3), ..
>
> I hope the problem is clear...
>
>
> ### Now my approach: pseudo-scala-code:
>
>
> val indexed = input.zipWithIndex
>
> val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))
>
> val left = flagged.filter(el => el.flag)
>
> val right = flagged.filter(el => !el.flag)
>
> left.fullOuterJoin(right)
>
>  .where(el.next)
>
>  .equalTo(el.key)
>
>  ...
>
>
> I attach my elements with a temporary key, that is increasing by 1, with
> zipWithIndex. Then, I map that tempKey to a boolean joinFlag: true if key
> is even, false if key is odd. Then I filter all elements with true, and put
> them in a dataset that is the left side of the next == key join. The right
> side are all elements with flag == false In the second run, I switch the
> flag construction to el.setFlag(i % 2 != 0).
>
> That actually works, there is only one problem:
>
>
> ### The problem:
>
>
> In my approach, I must not loose the total ordering of the data, because
> only if that ordering is preserved, the assignment of alternating
> join-flags works. Initially it is done by range-partitioning and
> partition-sorting. However, that ordering is destroyed, when data is
> shuffled for the join. And I can not restore it, because I have to run the
> whole thing in an iteration, and range-partitioning is not supported within
> iterations.
>
>
> ### Help?
>
> It sounds all very complicated, but the only thing I really have to solve
> is that join without any element appearing in multiple pairs (as described
> in "the situation"). If anyone has any idea how to solve this, that person
> would make my day so hard...
>
> Anyways, thanks for your time!
>
> Best, Fridtjof
>
>
>
> Am 01.02.16 um 11:32 schrieb Fridtjof Sander:
>
> Hi,
>
> I have a problem which seems to be unsolvable in Flink at the moment
> (1.0-Snapshot, current master branch)
> and I would kindly ask for some input, ideas on alternative approaches or
> just a confirmatory "yup, that doesn't work".
>
> ### Here's the situation:
>
> I have a dataset and its elements are totally ascending sorted by some key
> (Int). Each element has a "next-pointer" to its successor, which is just
> another field with the key of the following element: x0 -> x1 -> x2 -> x3
> -> ... -> xn The keys are not necessarily increasing by 1, so it may be
> that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I
> need to process that set in the following way: iterate: find all pairs of
> elements where "next == key" BUT make sure no element appears in multiple
> pairs example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair
> (x1, x2), (x3, x4), ... then, if some condition is met, combine a pair run
> above procedure again with switched pairing-condition: example: do pair
> (x1, x2), (x3, x4), (x5, x6), ... do not pair (x0, x1), (x2, x3), .. I hope
> the problem is clear... ### Now my approach: pseudo-scala-code:
>
> val indexed = input.zipWithIndex val flagged = indexed.map((i, el) =>
> el.setFlag(i % 2 == 0)) val left = flagged.filter(el => el.flag)
> val right = flagged.filter(el => !el.flag) left.fullOuterJoin(right)
> .where(el.next) .equalTo(el.key) ... I attach my elements with a temporary
> key, that is increasing by 1, with zipWithIndex. Then, I map that tempKey
> to a boolean joinFlag: true if key is even, false if key is odd. Then I
> filter all elements with true, and put them in a dataset that is the left
> side of the next == key join. The right side are all elements with flag ==
> false In the second run, I switch the flag construction to el.setFlag(i % 2
> != 0). That actually works, there is only one problem: ### The problem: In
> my approach, I must not loose the total ordering of the data, because only
> if that ordering is preserved, the assignment of alternating join-flags
> works. Initially it is done by range-partitioning and partition-sorting.
> However, that ordering is destroyed, when data is shuffled for the join.
> And I can not restore it, because I have to run the whole thing in an
> iteration, and range-partitioning is not supported within iterations. ###
> Help? It sounds all very complicated, but the only thing I really have to
> solve is that join without any element appearing in multiple pairs (as
> described in "the situation"). If anyone has any idea how to solve this,
> that person would make my day so hard... Anyways, thanks for your time!
> Best, Fridtjof
>
>
>

Re: join with no element appearing in multiple join-pairs

Posted by Fridtjof Sander <fs...@mailbox.tu-berlin.de>.
(tried to reformat)

Hi,

I have a problem which seems to be unsolvable in Flink at the moment 
(1.0-Snapshot, current master branch)
and I would kindly ask for some input, ideas on alternative approaches 
or just a confirmatory "yup, that doesn't work".

### Here's the situation:

I have a dataset and its elements are totally ascending sorted by some 
key (Int). Each element has a "next-pointer" to its successor, which is 
just another field with the key of the following element:

x0 -> x1 -> x2 -> x3 -> ... -> xn

The keys are not necessarily increasing by 1, so it may be that: x0 has 
key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I need to 
process that set in the following way:

iterate:

find all pairs of elements where "next == key" BUT make sure no element 
appears in multiple pairs

example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair (x1, 
x2), (x3, x4), ...

then, if some condition is met, combine a pair

run above procedure again with switched pairing-condition:

example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not pair (x0, x1), 
(x2, x3), ..

I hope the problem is clear...


### Now my approach: pseudo-scala-code:


val indexed = input.zipWithIndex

val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))

val left = flagged.filter(el => el.flag)

val right = flagged.filter(el => !el.flag)

left.fullOuterJoin(right)

  .where(el.next)

  .equalTo(el.key)

  ...


I attach my elements with a temporary key, that is increasing by 1, with 
zipWithIndex. Then, I map that tempKey to a boolean joinFlag: true if 
key is even, false if key is odd. Then I filter all elements with true, 
and put them in a dataset that is the left side of the next == key join. 
The right side are all elements with flag == false In the second run, I 
switch the flag construction to el.setFlag(i % 2 != 0).

That actually works, there is only one problem:


### The problem:


In my approach, I must not loose the total ordering of the data, because 
only if that ordering is preserved, the assignment of alternating 
join-flags works. Initially it is done by range-partitioning and 
partition-sorting. However, that ordering is destroyed, when data is 
shuffled for the join. And I can not restore it, because I have to run 
the whole thing in an iteration, and range-partitioning is not supported 
within iterations.


### Help?

It sounds all very complicated, but the only thing I really have to 
solve is that join without any element appearing in multiple pairs (as 
described in "the situation"). If anyone has any idea how to solve this, 
that person would make my day so hard...

Anyways, thanks for your time!

Best, Fridtjof



Am 01.02.16 um 11:32 schrieb Fridtjof Sander:
> Hi,
>
> I have a problem which seems to be unsolvable in Flink at the moment 
> (1.0-Snapshot, current master branch)
> and I would kindly ask for some input, ideas on alternative approaches 
> or just a confirmatory "yup, that doesn't work".
>
> ### Here's the situation:
>
> I have a dataset and its elements are totally ascending sorted by some 
> key (Int). Each element has a "next-pointer" to its successor, which 
> is just another field with the key of the following element: x0 -> x1 
> -> x2 -> x3 -> ... -> xn The keys are not necessarily increasing by 1, 
> so it may be that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 
> 25 and so on. I need to process that set in the following way: 
> iterate: find all pairs of elements where "next == key" BUT make sure 
> no element appears in multiple pairs example: do pair (x0, x1), (x2, 
> x3), (x4, x5), ... but don't pair (x1, x2), (x3, x4), ... then, if 
> some condition is met, combine a pair run above procedure again with 
> switched pairing-condition: example: do pair (x1, x2), (x3, x4), (x5, 
> x6), ... do not pair (x0, x1), (x2, x3), .. I hope the problem is 
> clear... ### Now my approach: pseudo-scala-code:
>
> val indexed = input.zipWithIndex val flagged = indexed.map((i, el) => 
> el.setFlag(i % 2 == 0)) val left = flagged.filter(el => el.flag)
> val right = flagged.filter(el => !el.flag) left.fullOuterJoin(right) 
> .where(el.next) .equalTo(el.key) ... I attach my elements with a 
> temporary key, that is increasing by 1, with zipWithIndex. Then, I map 
> that tempKey to a boolean joinFlag: true if key is even, false if key 
> is odd. Then I filter all elements with true, and put them in a 
> dataset that is the left side of the next == key join. The right side 
> are all elements with flag == false In the second run, I switch the 
> flag construction to el.setFlag(i % 2 != 0). That actually works, 
> there is only one problem: ### The problem: In my approach, I must not 
> loose the total ordering of the data, because only if that ordering is 
> preserved, the assignment of alternating join-flags works. Initially 
> it is done by range-partitioning and partition-sorting. However, that 
> ordering is destroyed, when data is shuffled for the join. And I can 
> not restore it, because I have to run the whole thing in an iteration, 
> and range-partitioning is not supported within iterations. ### Help? 
> It sounds all very complicated, but the only thing I really have to 
> solve is that join without any element appearing in multiple pairs (as 
> described in "the situation"). If anyone has any idea how to solve 
> this, that person would make my day so hard... Anyways, thanks for 
> your time! Best, Fridtjof
>