You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Lin Li <li...@gmail.com> on 2017/02/21 15:13:37 UTC

[DISCUSS] Should we supply a new Iterator instance for Functions with Iterable input(s) like CoGroupFunction ?

Hi,

    When I try to implement https://issues.apache.org/jira/browse/FLINK-5498
via "dataset.coGroup(another dataset)" with a generated
CoGroupFunction.(CoGroupFunction
interface: public void coGroup(Iterable<IN1> first, Iterable<IN2> second,
Collector<O> out)

     I couldn't get the right results, then I saw the backend Iterator did
not supply a new instance when invoked the "Iterable.iterator()" after
debugging.
(see  org.apache.flink.api.common.operators.util.ListKeyGroupedIterator,
 it differs from usual iterable collections in java which will implement
the iterator() method that supply a new iterator instance for the
collection. And this is not mentioned either in comments or document.)

IMO, iterable collections' new iterator instance requirements probably
useful for other cases, so is it necessary to add this feature?
Greatful if someone can tell me the motivation that ListKeyGroupedIterator
didn't supply a new iterator instance.

What do you think?

Best, Lincoln

Re: [DISCUSS] Should we supply a new Iterator instance for Functions with Iterable input(s) like CoGroupFunction ?

Posted by Lin Li <li...@gmail.com>.
I created a jira https://issues.apache.org/jira/browse/FLINK-5883, and will
work on this asap.

2017-02-22 21:01 GMT+08:00 Aljoscha Krettek <al...@apache.org>:

> I think this was mostly an oversight on my part that was possible because
> we didn't have good test-coverage that was enforcing correctness. Please go
> ahead and open an issue for re-adding the throw.
>
> On Wed, 22 Feb 2017 at 13:28 Lin Li <li...@gmail.com> wrote:
>
> > Thank you for the answer!
> >
> > The discussion on FLINK-1023 is very clear to me. I agree with that
> throws
> > a TraversableOnceException when the iterator is requested the second
> time.
> >
> > @Aljoscha git history shows you removed the exception-thrown code from
> > FLINK-1110, would you mind me create an issue and add it back?
> >
> > BTW, I had submitted a pr for FLINK-5498 (
> > https://github.com/apache/flink/pull/3379), support left/right outer
> joins
> > with non-equi-join conditions via coGroup operator with a generated
> > OuterJoinCoGroupFunction.
> > But current implementation is not memory safe when do a many-to-one/many
> > outer join which will copy the opposite side input into an List
> buffer(This
> > is not pretty though it follows the guideline, just remember the input
> data
> > within a function call). It's a work-around for now, in the long run, I
> > think we should extend the runtime join operators to support such
> > non-equi-join conditions.  Implementation in TableAPI layer could not
> >  always ensures the efficiency.
> > Welcome any suggestions on current solution.
> >
> > Best, Lincoln
> >
>

Re: [DISCUSS] Should we supply a new Iterator instance for Functions with Iterable input(s) like CoGroupFunction ?

Posted by Aljoscha Krettek <al...@apache.org>.
I think this was mostly an oversight on my part that was possible because
we didn't have good test-coverage that was enforcing correctness. Please go
ahead and open an issue for re-adding the throw.

On Wed, 22 Feb 2017 at 13:28 Lin Li <li...@gmail.com> wrote:

> Thank you for the answer!
>
> The discussion on FLINK-1023 is very clear to me. I agree with that throws
> a TraversableOnceException when the iterator is requested the second time.
>
> @Aljoscha git history shows you removed the exception-thrown code from
> FLINK-1110, would you mind me create an issue and add it back?
>
> BTW, I had submitted a pr for FLINK-5498 (
> https://github.com/apache/flink/pull/3379), support left/right outer joins
> with non-equi-join conditions via coGroup operator with a generated
> OuterJoinCoGroupFunction.
> But current implementation is not memory safe when do a many-to-one/many
> outer join which will copy the opposite side input into an List buffer(This
> is not pretty though it follows the guideline, just remember the input data
> within a function call). It's a work-around for now, in the long run, I
> think we should extend the runtime join operators to support such
> non-equi-join conditions.  Implementation in TableAPI layer could not
>  always ensures the efficiency.
> Welcome any suggestions on current solution.
>
> Best, Lincoln
>

Re: [DISCUSS] Should we supply a new Iterator instance for Functions with Iterable input(s) like CoGroupFunction ?

Posted by Lin Li <li...@gmail.com>.
Thank you for the answer!

The discussion on FLINK-1023 is very clear to me. I agree with that throws
a TraversableOnceException when the iterator is requested the second time.

@Aljoscha git history shows you removed the exception-thrown code from
FLINK-1110, would you mind me create an issue and add it back?

BTW, I had submitted a pr for FLINK-5498 (
https://github.com/apache/flink/pull/3379), support left/right outer joins
with non-equi-join conditions via coGroup operator with a generated
OuterJoinCoGroupFunction.
But current implementation is not memory safe when do a many-to-one/many
outer join which will copy the opposite side input into an List buffer(This
is not pretty though it follows the guideline, just remember the input data
within a function call). It's a work-around for now, in the long run, I
think we should extend the runtime join operators to support such
non-equi-join conditions.  Implementation in TableAPI layer could not
 always ensures the efficiency.
Welcome any suggestions on current solution.

Best, Lincoln

Re: [DISCUSS] Should we supply a new Iterator instance for Functions with Iterable input(s) like CoGroupFunction ?

Posted by Ufuk Celebi <uc...@apache.org>.
On Wed, Feb 22, 2017 at 11:19 AM, Till Rohrmann <tr...@apache.org> wrote:
> In general, you’re right Lin Li that we don’t honour the Iterable contract
> which should allow you to create an arbitrary number of iterators over the
> data. Honestly, I’m not sure why we did this change because it’s not very
> intuitive. Maybe Ufuk can chime in because he opened FLINK-1023.

The discussion in the issue is pretty detailed. The best summary is
probably found in the PR description
https://github.com/apache/flink/pull/84:

"This patch allows the GroupReduce and the CoGroup to use the
beautiful foreach loop syntax."

Orignally, the PR ensured that a TraversableOnceException was thrown.

Re: [DISCUSS] Should we supply a new Iterator instance for Functions with Iterable input(s) like CoGroupFunction ?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Lin Li,

I think the oversight is more that we don’t throw a TraversableOnceException
if you request more than one iterator as it is the case for the Iterables
used for the non collection mode. Otherwise you will have a different
behaviour for the collection and the non collection mode.

In general, you’re right Lin Li that we don’t honour the Iterable contract
which should allow you to create an arbitrary number of iterators over the
data. Honestly, I’m not sure why we did this change because it’s not very
intuitive. Maybe Ufuk can chime in because he opened FLINK-1023.

To give you some background why we don’t allow the Iterable to return
multiple iterators over the data is that we would have to keep all the data
around in case the user creates a new iterator. Given that the data might
grow quite big, this can be a burden. With the iterator contract you know
that you can free the resources once the current element has been processed.

Cheers,
Till
​

On Wed, Feb 22, 2017 at 11:10 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> this is probably an oversight. If it helps you implement the feature,
> please go ahead and add a sub-issue for solving the Iterator problem.
>
> Best,
> Aljoscha
>
> On Tue, 21 Feb 2017 at 16:13 Lin Li <li...@gmail.com> wrote:
>
> > Hi,
> >
> >     When I try to implement
> > https://issues.apache.org/jira/browse/FLINK-5498
> > via "dataset.coGroup(another dataset)" with a generated
> > CoGroupFunction.(CoGroupFunction
> > interface: public void coGroup(Iterable<IN1> first, Iterable<IN2> second,
> > Collector<O> out)
> >
> >      I couldn't get the right results, then I saw the backend Iterator
> did
> > not supply a new instance when invoked the "Iterable.iterator()" after
> > debugging.
> > (see  org.apache.flink.api.common.operators.util.ListKeyGroupedIterator,
> >  it differs from usual iterable collections in java which will implement
> > the iterator() method that supply a new iterator instance for the
> > collection. And this is not mentioned either in comments or document.)
> >
> > IMO, iterable collections' new iterator instance requirements probably
> > useful for other cases, so is it necessary to add this feature?
> > Greatful if someone can tell me the motivation that
> ListKeyGroupedIterator
> > didn't supply a new iterator instance.
> >
> > What do you think?
> >
> > Best, Lincoln
> >
>

Re: [DISCUSS] Should we supply a new Iterator instance for Functions with Iterable input(s) like CoGroupFunction ?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
this is probably an oversight. If it helps you implement the feature,
please go ahead and add a sub-issue for solving the Iterator problem.

Best,
Aljoscha

On Tue, 21 Feb 2017 at 16:13 Lin Li <li...@gmail.com> wrote:

> Hi,
>
>     When I try to implement
> https://issues.apache.org/jira/browse/FLINK-5498
> via "dataset.coGroup(another dataset)" with a generated
> CoGroupFunction.(CoGroupFunction
> interface: public void coGroup(Iterable<IN1> first, Iterable<IN2> second,
> Collector<O> out)
>
>      I couldn't get the right results, then I saw the backend Iterator did
> not supply a new instance when invoked the "Iterable.iterator()" after
> debugging.
> (see  org.apache.flink.api.common.operators.util.ListKeyGroupedIterator,
>  it differs from usual iterable collections in java which will implement
> the iterator() method that supply a new iterator instance for the
> collection. And this is not mentioned either in comments or document.)
>
> IMO, iterable collections' new iterator instance requirements probably
> useful for other cases, so is it necessary to add this feature?
> Greatful if someone can tell me the motivation that ListKeyGroupedIterator
> didn't supply a new iterator instance.
>
> What do you think?
>
> Best, Lincoln
>