You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Timo Walther <tw...@apache.org> on 2015/03/06 10:19:28 UTC

Semantic Properties and Functions with Iterables

Hey all,

I'm currently working a lot on the UDF static code analyzer. But I have 
a general question about Semantic Properties which might be also 
interesting for other users.

How is the ForwardedFields annotation interpreted for UDF functions with 
Iterables?

An example can be found in: 
org.apache.flink.examples.java.graph.EnumTrianglesBasic.TriadBuilder

Does this mean that each call of "collect" must happen in the same order 
than the call of "next"? But this is not the case in the example above. 
Or does the annotation only refer to the first iterator element?

Other examples:

@ForwardedFields("*") // CORRECT?
     public static class GroupReduce1 implements 
GroupReduceFunction<Tuple2<Long, Long>,Tuple2<Long, Long>> {
         @Override
         public void reduce(Iterable<Tuple2<Long, Long>> values,
                 Collector<Tuple2<Long, Long>> out) throws Exception {
             out.collect(values.iterator().next());
         }
     }

@ForwardedFields("*") // NOT CORRECT?
     public static class GroupReduce3 implements 
GroupReduceFunction<Tuple2<Long, Long>,Tuple2<Long, Long>> {
         @Override
         public void reduce(Iterable<Tuple2<Long, Long>> values,
                 Collector<Tuple2<Long, Long>> out) throws Exception {
             Iterator<Tuple2<Long, Long>> it = values.iterator();
             while (it.hasNext()) {
                 Tuple2<Long,Long> t = it.next();
                 if (t.f0 == 42) {
                     out.collect(t);
                 }
             }
         }
     }

Thanks in advance.

Regards,
Timo

Re: Semantic Properties and Functions with Iterables

Posted by Stephan Ewen <se...@apache.org>.
That is right, Timo, as far as I would understand it.

On Mon, Mar 9, 2015 at 12:04 PM, Timo Walther <tw...@apache.org> wrote:

> Thanks for the clarification. If I have understood it correctly, forwarded
> fields are only interesting for key fields, right? So I will implement that
> key information is passed to the analyzer for consideration.
>
> So if GroupReduce1 is grouped by f1, the result will be
> @ForwardedFields("1") in this example and not "*":
>
>     public static class GroupReduce1 implements GroupReduceFunction<Tuple2<Long,
> Long>,Tuple2<Long, Long>> {
>         @Override
>         public void reduce(Iterable<Tuple2<Long, Long>> values,
>                 Collector<Tuple2<Long, Long>> out) throws Exception {
>             out.collect(values.iterator().next());
>         }
>     }
>
>
> On 08.03.2015 23:21, Fabian Hueske wrote:
>
>> I added you comment and an answer to FLINK-1656:
>>
>> "Right, that's a good point.
>>
>> +1 limiting to key fields. That's much easier to reason about for users.
>>
>> However, I am not sure how it is implemented right now.
>> I guess secondary sort info is already removed by the property filtering,
>> but I need to verify that."
>>
>> 2015-03-08 21:53 GMT+01:00 Stephan Ewen <se...@apache.org>:
>>
>>  Any other thoughts in this?
>>>
>>> On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>  I think the order of emitting elements is not part of the forward field
>>>> properties, but would rather be a separate one that we do not have right
>>>> now.
>>>>
>>>> At the moment, we would assume that all group operations destroy
>>>>
>>> secondary
>>>
>>>> orders.
>>>>
>>>> In that sense, forward fields in group operations only make sense for
>>>> fields where all fields are the same in the group (key fields).
>>>>
>>>> On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske <fh...@gmail.com>
>>>>
>>> wrote:
>>>
>>>> Hi Timo,
>>>>>
>>>>> there are several restrictions for forwarded fields of operators with
>>>>> iterator input.
>>>>> 1) forwarded fields must be emitted in the order in which they are
>>>>> received
>>>>> through the iterator
>>>>> 2) all forwarded fields of a record must stick together, i.e., if your
>>>>> function builds record from field 0 of the 1st, 3rd, 5th, ... and field
>>>>>
>>>> 1
>>>
>>>> of the 2nd, 4th, ... record coming through the iterator, these are not
>>>>> valid forwarded fields.
>>>>> 3) it is OK to completely filter out records coming through the
>>>>>
>>>> iterator.
>>>
>>>> The reason for these rules is, that the optimizer uses forwarded fields
>>>>>
>>>> to
>>>
>>>> reason about physical data properties such as order and grouping. If you
>>>>> mix up the order of records or emit records which are composed from
>>>>> different input records, you might destroy a (secondary) order or
>>>>> grouping.
>>>>>
>>>>> Considering these rules, your second example is correct as well.
>>>>> In case of the TriadBuilder, the information is correct (in the context
>>>>>
>>>> of
>>>
>>>> the Program) as well, because field 0 is used as key. It is however
>>>>>
>>>> true,
>>>
>>>> that there is a strange dependency between the function and the context
>>>>>
>>>> in
>>>
>>>> which it is used within the program. It would be better to remove the
>>>>> class
>>>>> annotation, and add this information through the
>>>>>
>>>> .withForwardedFields("0")
>>>
>>>> method in the program, to make that clear.
>>>>>
>>>>> It is very good that you raise this point.
>>>>> This is currently not reflected in the documentation is should be made
>>>>> clear very soon. I will open a JIRA for that.
>>>>>
>>>>> Thanks, Fabian
>>>>>
>>>>>
>>>>>
>>>>> 2015-03-06 10:19 GMT+01:00 Timo Walther <tw...@apache.org>:
>>>>>
>>>>>  Hey all,
>>>>>>
>>>>>> I'm currently working a lot on the UDF static code analyzer. But I
>>>>>>
>>>>> have
>>>
>>>> a
>>>>>
>>>>>> general question about Semantic Properties which might be also
>>>>>>
>>>>> interesting
>>>>>
>>>>>> for other users.
>>>>>>
>>>>>> How is the ForwardedFields annotation interpreted for UDF functions
>>>>>>
>>>>> with
>>>
>>>> Iterables?
>>>>>>
>>>>>> An example can be found in: org.apache.flink.examples.
>>>>>> java.graph.EnumTrianglesBasic.TriadBuilder
>>>>>>
>>>>>> Does this mean that each call of "collect" must happen in the same
>>>>>>
>>>>> order
>>>
>>>> than the call of "next"? But this is not the case in the example
>>>>>>
>>>>> above.
>>>
>>>> Or
>>>>>
>>>>>> does the annotation only refer to the first iterator element?
>>>>>>
>>>>>> Other examples:
>>>>>>
>>>>>> @ForwardedFields("*") // CORRECT?
>>>>>>      public static class GroupReduce1 implements
>>>>>>
>>>>> GroupReduceFunction<Tuple2<Long,
>>>>>
>>>>>> Long>,Tuple2<Long, Long>> {
>>>>>>          @Override
>>>>>>          public void reduce(Iterable<Tuple2<Long, Long>> values,
>>>>>>                  Collector<Tuple2<Long, Long>> out) throws Exception {
>>>>>>              out.collect(values.iterator().next());
>>>>>>          }
>>>>>>      }
>>>>>>
>>>>>> @ForwardedFields("*") // NOT CORRECT?
>>>>>>      public static class GroupReduce3 implements
>>>>>>
>>>>> GroupReduceFunction<Tuple2<Long,
>>>>>
>>>>>> Long>,Tuple2<Long, Long>> {
>>>>>>          @Override
>>>>>>          public void reduce(Iterable<Tuple2<Long, Long>> values,
>>>>>>                  Collector<Tuple2<Long, Long>> out) throws Exception {
>>>>>>              Iterator<Tuple2<Long, Long>> it = values.iterator();
>>>>>>              while (it.hasNext()) {
>>>>>>                  Tuple2<Long,Long> t = it.next();
>>>>>>                  if (t.f0 == 42) {
>>>>>>                      out.collect(t);
>>>>>>                  }
>>>>>>              }
>>>>>>          }
>>>>>>      }
>>>>>>
>>>>>> Thanks in advance.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>
>

Re: Semantic Properties and Functions with Iterables

Posted by Timo Walther <tw...@apache.org>.
Thanks for the clarification. If I have understood it correctly, 
forwarded fields are only interesting for key fields, right? So I will 
implement that key information is passed to the analyzer for consideration.

So if GroupReduce1 is grouped by f1, the result will be 
@ForwardedFields("1") in this example and not "*":

     public static class GroupReduce1 implements 
GroupReduceFunction<Tuple2<Long, Long>,Tuple2<Long, Long>> {
         @Override
         public void reduce(Iterable<Tuple2<Long, Long>> values,
                 Collector<Tuple2<Long, Long>> out) throws Exception {
             out.collect(values.iterator().next());
         }
     }


On 08.03.2015 23:21, Fabian Hueske wrote:
> I added you comment and an answer to FLINK-1656:
>
> "Right, that's a good point.
>
> +1 limiting to key fields. That's much easier to reason about for users.
>
> However, I am not sure how it is implemented right now.
> I guess secondary sort info is already removed by the property filtering,
> but I need to verify that."
>
> 2015-03-08 21:53 GMT+01:00 Stephan Ewen <se...@apache.org>:
>
>> Any other thoughts in this?
>>
>> On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> I think the order of emitting elements is not part of the forward field
>>> properties, but would rather be a separate one that we do not have right
>>> now.
>>>
>>> At the moment, we would assume that all group operations destroy
>> secondary
>>> orders.
>>>
>>> In that sense, forward fields in group operations only make sense for
>>> fields where all fields are the same in the group (key fields).
>>>
>>> On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>>> Hi Timo,
>>>>
>>>> there are several restrictions for forwarded fields of operators with
>>>> iterator input.
>>>> 1) forwarded fields must be emitted in the order in which they are
>>>> received
>>>> through the iterator
>>>> 2) all forwarded fields of a record must stick together, i.e., if your
>>>> function builds record from field 0 of the 1st, 3rd, 5th, ... and field
>> 1
>>>> of the 2nd, 4th, ... record coming through the iterator, these are not
>>>> valid forwarded fields.
>>>> 3) it is OK to completely filter out records coming through the
>> iterator.
>>>> The reason for these rules is, that the optimizer uses forwarded fields
>> to
>>>> reason about physical data properties such as order and grouping. If you
>>>> mix up the order of records or emit records which are composed from
>>>> different input records, you might destroy a (secondary) order or
>>>> grouping.
>>>>
>>>> Considering these rules, your second example is correct as well.
>>>> In case of the TriadBuilder, the information is correct (in the context
>> of
>>>> the Program) as well, because field 0 is used as key. It is however
>> true,
>>>> that there is a strange dependency between the function and the context
>> in
>>>> which it is used within the program. It would be better to remove the
>>>> class
>>>> annotation, and add this information through the
>> .withForwardedFields("0")
>>>> method in the program, to make that clear.
>>>>
>>>> It is very good that you raise this point.
>>>> This is currently not reflected in the documentation is should be made
>>>> clear very soon. I will open a JIRA for that.
>>>>
>>>> Thanks, Fabian
>>>>
>>>>
>>>>
>>>> 2015-03-06 10:19 GMT+01:00 Timo Walther <tw...@apache.org>:
>>>>
>>>>> Hey all,
>>>>>
>>>>> I'm currently working a lot on the UDF static code analyzer. But I
>> have
>>>> a
>>>>> general question about Semantic Properties which might be also
>>>> interesting
>>>>> for other users.
>>>>>
>>>>> How is the ForwardedFields annotation interpreted for UDF functions
>> with
>>>>> Iterables?
>>>>>
>>>>> An example can be found in: org.apache.flink.examples.
>>>>> java.graph.EnumTrianglesBasic.TriadBuilder
>>>>>
>>>>> Does this mean that each call of "collect" must happen in the same
>> order
>>>>> than the call of "next"? But this is not the case in the example
>> above.
>>>> Or
>>>>> does the annotation only refer to the first iterator element?
>>>>>
>>>>> Other examples:
>>>>>
>>>>> @ForwardedFields("*") // CORRECT?
>>>>>      public static class GroupReduce1 implements
>>>> GroupReduceFunction<Tuple2<Long,
>>>>> Long>,Tuple2<Long, Long>> {
>>>>>          @Override
>>>>>          public void reduce(Iterable<Tuple2<Long, Long>> values,
>>>>>                  Collector<Tuple2<Long, Long>> out) throws Exception {
>>>>>              out.collect(values.iterator().next());
>>>>>          }
>>>>>      }
>>>>>
>>>>> @ForwardedFields("*") // NOT CORRECT?
>>>>>      public static class GroupReduce3 implements
>>>> GroupReduceFunction<Tuple2<Long,
>>>>> Long>,Tuple2<Long, Long>> {
>>>>>          @Override
>>>>>          public void reduce(Iterable<Tuple2<Long, Long>> values,
>>>>>                  Collector<Tuple2<Long, Long>> out) throws Exception {
>>>>>              Iterator<Tuple2<Long, Long>> it = values.iterator();
>>>>>              while (it.hasNext()) {
>>>>>                  Tuple2<Long,Long> t = it.next();
>>>>>                  if (t.f0 == 42) {
>>>>>                      out.collect(t);
>>>>>                  }
>>>>>              }
>>>>>          }
>>>>>      }
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>


Re: Semantic Properties and Functions with Iterables

Posted by Fabian Hueske <fh...@gmail.com>.
I added you comment and an answer to FLINK-1656:

"Right, that's a good point.

+1 limiting to key fields. That's much easier to reason about for users.

However, I am not sure how it is implemented right now.
I guess secondary sort info is already removed by the property filtering,
but I need to verify that."

2015-03-08 21:53 GMT+01:00 Stephan Ewen <se...@apache.org>:

> Any other thoughts in this?
>
> On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > I think the order of emitting elements is not part of the forward field
> > properties, but would rather be a separate one that we do not have right
> > now.
> >
> > At the moment, we would assume that all group operations destroy
> secondary
> > orders.
> >
> > In that sense, forward fields in group operations only make sense for
> > fields where all fields are the same in the group (key fields).
> >
> > On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske <fh...@gmail.com>
> wrote:
> >
> >> Hi Timo,
> >>
> >> there are several restrictions for forwarded fields of operators with
> >> iterator input.
> >> 1) forwarded fields must be emitted in the order in which they are
> >> received
> >> through the iterator
> >> 2) all forwarded fields of a record must stick together, i.e., if your
> >> function builds record from field 0 of the 1st, 3rd, 5th, ... and field
> 1
> >> of the 2nd, 4th, ... record coming through the iterator, these are not
> >> valid forwarded fields.
> >> 3) it is OK to completely filter out records coming through the
> iterator.
> >>
> >> The reason for these rules is, that the optimizer uses forwarded fields
> to
> >> reason about physical data properties such as order and grouping. If you
> >> mix up the order of records or emit records which are composed from
> >> different input records, you might destroy a (secondary) order or
> >> grouping.
> >>
> >> Considering these rules, your second example is correct as well.
> >> In case of the TriadBuilder, the information is correct (in the context
> of
> >> the Program) as well, because field 0 is used as key. It is however
> true,
> >> that there is a strange dependency between the function and the context
> in
> >> which it is used within the program. It would be better to remove the
> >> class
> >> annotation, and add this information through the
> .withForwardedFields("0")
> >> method in the program, to make that clear.
> >>
> >> It is very good that you raise this point.
> >> This is currently not reflected in the documentation is should be made
> >> clear very soon. I will open a JIRA for that.
> >>
> >> Thanks, Fabian
> >>
> >>
> >>
> >> 2015-03-06 10:19 GMT+01:00 Timo Walther <tw...@apache.org>:
> >>
> >> > Hey all,
> >> >
> >> > I'm currently working a lot on the UDF static code analyzer. But I
> have
> >> a
> >> > general question about Semantic Properties which might be also
> >> interesting
> >> > for other users.
> >> >
> >> > How is the ForwardedFields annotation interpreted for UDF functions
> with
> >> > Iterables?
> >> >
> >> > An example can be found in: org.apache.flink.examples.
> >> > java.graph.EnumTrianglesBasic.TriadBuilder
> >> >
> >> > Does this mean that each call of "collect" must happen in the same
> order
> >> > than the call of "next"? But this is not the case in the example
> above.
> >> Or
> >> > does the annotation only refer to the first iterator element?
> >> >
> >> > Other examples:
> >> >
> >> > @ForwardedFields("*") // CORRECT?
> >> >     public static class GroupReduce1 implements
> >> GroupReduceFunction<Tuple2<Long,
> >> > Long>,Tuple2<Long, Long>> {
> >> >         @Override
> >> >         public void reduce(Iterable<Tuple2<Long, Long>> values,
> >> >                 Collector<Tuple2<Long, Long>> out) throws Exception {
> >> >             out.collect(values.iterator().next());
> >> >         }
> >> >     }
> >> >
> >> > @ForwardedFields("*") // NOT CORRECT?
> >> >     public static class GroupReduce3 implements
> >> GroupReduceFunction<Tuple2<Long,
> >> > Long>,Tuple2<Long, Long>> {
> >> >         @Override
> >> >         public void reduce(Iterable<Tuple2<Long, Long>> values,
> >> >                 Collector<Tuple2<Long, Long>> out) throws Exception {
> >> >             Iterator<Tuple2<Long, Long>> it = values.iterator();
> >> >             while (it.hasNext()) {
> >> >                 Tuple2<Long,Long> t = it.next();
> >> >                 if (t.f0 == 42) {
> >> >                     out.collect(t);
> >> >                 }
> >> >             }
> >> >         }
> >> >     }
> >> >
> >> > Thanks in advance.
> >> >
> >> > Regards,
> >> > Timo
> >> >
> >>
> >
> >
>

Re: Semantic Properties and Functions with Iterables

Posted by Stephan Ewen <se...@apache.org>.
Any other thoughts in this?

On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote:

> I think the order of emitting elements is not part of the forward field
> properties, but would rather be a separate one that we do not have right
> now.
>
> At the moment, we would assume that all group operations destroy secondary
> orders.
>
> In that sense, forward fields in group operations only make sense for
> fields where all fields are the same in the group (key fields).
>
> On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Timo,
>>
>> there are several restrictions for forwarded fields of operators with
>> iterator input.
>> 1) forwarded fields must be emitted in the order in which they are
>> received
>> through the iterator
>> 2) all forwarded fields of a record must stick together, i.e., if your
>> function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1
>> of the 2nd, 4th, ... record coming through the iterator, these are not
>> valid forwarded fields.
>> 3) it is OK to completely filter out records coming through the iterator.
>>
>> The reason for these rules is, that the optimizer uses forwarded fields to
>> reason about physical data properties such as order and grouping. If you
>> mix up the order of records or emit records which are composed from
>> different input records, you might destroy a (secondary) order or
>> grouping.
>>
>> Considering these rules, your second example is correct as well.
>> In case of the TriadBuilder, the information is correct (in the context of
>> the Program) as well, because field 0 is used as key. It is however true,
>> that there is a strange dependency between the function and the context in
>> which it is used within the program. It would be better to remove the
>> class
>> annotation, and add this information through the .withForwardedFields("0")
>> method in the program, to make that clear.
>>
>> It is very good that you raise this point.
>> This is currently not reflected in the documentation is should be made
>> clear very soon. I will open a JIRA for that.
>>
>> Thanks, Fabian
>>
>>
>>
>> 2015-03-06 10:19 GMT+01:00 Timo Walther <tw...@apache.org>:
>>
>> > Hey all,
>> >
>> > I'm currently working a lot on the UDF static code analyzer. But I have
>> a
>> > general question about Semantic Properties which might be also
>> interesting
>> > for other users.
>> >
>> > How is the ForwardedFields annotation interpreted for UDF functions with
>> > Iterables?
>> >
>> > An example can be found in: org.apache.flink.examples.
>> > java.graph.EnumTrianglesBasic.TriadBuilder
>> >
>> > Does this mean that each call of "collect" must happen in the same order
>> > than the call of "next"? But this is not the case in the example above.
>> Or
>> > does the annotation only refer to the first iterator element?
>> >
>> > Other examples:
>> >
>> > @ForwardedFields("*") // CORRECT?
>> >     public static class GroupReduce1 implements
>> GroupReduceFunction<Tuple2<Long,
>> > Long>,Tuple2<Long, Long>> {
>> >         @Override
>> >         public void reduce(Iterable<Tuple2<Long, Long>> values,
>> >                 Collector<Tuple2<Long, Long>> out) throws Exception {
>> >             out.collect(values.iterator().next());
>> >         }
>> >     }
>> >
>> > @ForwardedFields("*") // NOT CORRECT?
>> >     public static class GroupReduce3 implements
>> GroupReduceFunction<Tuple2<Long,
>> > Long>,Tuple2<Long, Long>> {
>> >         @Override
>> >         public void reduce(Iterable<Tuple2<Long, Long>> values,
>> >                 Collector<Tuple2<Long, Long>> out) throws Exception {
>> >             Iterator<Tuple2<Long, Long>> it = values.iterator();
>> >             while (it.hasNext()) {
>> >                 Tuple2<Long,Long> t = it.next();
>> >                 if (t.f0 == 42) {
>> >                     out.collect(t);
>> >                 }
>> >             }
>> >         }
>> >     }
>> >
>> > Thanks in advance.
>> >
>> > Regards,
>> > Timo
>> >
>>
>
>

Re: Semantic Properties and Functions with Iterables

Posted by Stephan Ewen <se...@apache.org>.
I think the order of emitting elements is not part of the forward field
properties, but would rather be a separate one that we do not have right
now.

At the moment, we would assume that all group operations destroy secondary
orders.

In that sense, forward fields in group operations only make sense for
fields where all fields are the same in the group (key fields).

On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Timo,
>
> there are several restrictions for forwarded fields of operators with
> iterator input.
> 1) forwarded fields must be emitted in the order in which they are received
> through the iterator
> 2) all forwarded fields of a record must stick together, i.e., if your
> function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1
> of the 2nd, 4th, ... record coming through the iterator, these are not
> valid forwarded fields.
> 3) it is OK to completely filter out records coming through the iterator.
>
> The reason for these rules is, that the optimizer uses forwarded fields to
> reason about physical data properties such as order and grouping. If you
> mix up the order of records or emit records which are composed from
> different input records, you might destroy a (secondary) order or grouping.
>
> Considering these rules, your second example is correct as well.
> In case of the TriadBuilder, the information is correct (in the context of
> the Program) as well, because field 0 is used as key. It is however true,
> that there is a strange dependency between the function and the context in
> which it is used within the program. It would be better to remove the class
> annotation, and add this information through the .withForwardedFields("0")
> method in the program, to make that clear.
>
> It is very good that you raise this point.
> This is currently not reflected in the documentation is should be made
> clear very soon. I will open a JIRA for that.
>
> Thanks, Fabian
>
>
>
> 2015-03-06 10:19 GMT+01:00 Timo Walther <tw...@apache.org>:
>
> > Hey all,
> >
> > I'm currently working a lot on the UDF static code analyzer. But I have a
> > general question about Semantic Properties which might be also
> interesting
> > for other users.
> >
> > How is the ForwardedFields annotation interpreted for UDF functions with
> > Iterables?
> >
> > An example can be found in: org.apache.flink.examples.
> > java.graph.EnumTrianglesBasic.TriadBuilder
> >
> > Does this mean that each call of "collect" must happen in the same order
> > than the call of "next"? But this is not the case in the example above.
> Or
> > does the annotation only refer to the first iterator element?
> >
> > Other examples:
> >
> > @ForwardedFields("*") // CORRECT?
> >     public static class GroupReduce1 implements
> GroupReduceFunction<Tuple2<Long,
> > Long>,Tuple2<Long, Long>> {
> >         @Override
> >         public void reduce(Iterable<Tuple2<Long, Long>> values,
> >                 Collector<Tuple2<Long, Long>> out) throws Exception {
> >             out.collect(values.iterator().next());
> >         }
> >     }
> >
> > @ForwardedFields("*") // NOT CORRECT?
> >     public static class GroupReduce3 implements
> GroupReduceFunction<Tuple2<Long,
> > Long>,Tuple2<Long, Long>> {
> >         @Override
> >         public void reduce(Iterable<Tuple2<Long, Long>> values,
> >                 Collector<Tuple2<Long, Long>> out) throws Exception {
> >             Iterator<Tuple2<Long, Long>> it = values.iterator();
> >             while (it.hasNext()) {
> >                 Tuple2<Long,Long> t = it.next();
> >                 if (t.f0 == 42) {
> >                     out.collect(t);
> >                 }
> >             }
> >         }
> >     }
> >
> > Thanks in advance.
> >
> > Regards,
> > Timo
> >
>

Re: Semantic Properties and Functions with Iterables

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Timo,

there are several restrictions for forwarded fields of operators with
iterator input.
1) forwarded fields must be emitted in the order in which they are received
through the iterator
2) all forwarded fields of a record must stick together, i.e., if your
function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1
of the 2nd, 4th, ... record coming through the iterator, these are not
valid forwarded fields.
3) it is OK to completely filter out records coming through the iterator.

The reason for these rules is, that the optimizer uses forwarded fields to
reason about physical data properties such as order and grouping. If you
mix up the order of records or emit records which are composed from
different input records, you might destroy a (secondary) order or grouping.

Considering these rules, your second example is correct as well.
In case of the TriadBuilder, the information is correct (in the context of
the Program) as well, because field 0 is used as key. It is however true,
that there is a strange dependency between the function and the context in
which it is used within the program. It would be better to remove the class
annotation, and add this information through the .withForwardedFields("0")
method in the program, to make that clear.

It is very good that you raise this point.
This is currently not reflected in the documentation is should be made
clear very soon. I will open a JIRA for that.

Thanks, Fabian



2015-03-06 10:19 GMT+01:00 Timo Walther <tw...@apache.org>:

> Hey all,
>
> I'm currently working a lot on the UDF static code analyzer. But I have a
> general question about Semantic Properties which might be also interesting
> for other users.
>
> How is the ForwardedFields annotation interpreted for UDF functions with
> Iterables?
>
> An example can be found in: org.apache.flink.examples.
> java.graph.EnumTrianglesBasic.TriadBuilder
>
> Does this mean that each call of "collect" must happen in the same order
> than the call of "next"? But this is not the case in the example above. Or
> does the annotation only refer to the first iterator element?
>
> Other examples:
>
> @ForwardedFields("*") // CORRECT?
>     public static class GroupReduce1 implements GroupReduceFunction<Tuple2<Long,
> Long>,Tuple2<Long, Long>> {
>         @Override
>         public void reduce(Iterable<Tuple2<Long, Long>> values,
>                 Collector<Tuple2<Long, Long>> out) throws Exception {
>             out.collect(values.iterator().next());
>         }
>     }
>
> @ForwardedFields("*") // NOT CORRECT?
>     public static class GroupReduce3 implements GroupReduceFunction<Tuple2<Long,
> Long>,Tuple2<Long, Long>> {
>         @Override
>         public void reduce(Iterable<Tuple2<Long, Long>> values,
>                 Collector<Tuple2<Long, Long>> out) throws Exception {
>             Iterator<Tuple2<Long, Long>> it = values.iterator();
>             while (it.hasNext()) {
>                 Tuple2<Long,Long> t = it.next();
>                 if (t.f0 == 42) {
>                     out.collect(t);
>                 }
>             }
>         }
>     }
>
> Thanks in advance.
>
> Regards,
> Timo
>