You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Claudia Wegmann <c....@kasasi.de> on 2016/08/08 12:26:52 UTC

max aggregator dosen't work as expected

Hey,

I have some questions to aggregate functions such as max or min. Take the following example:

//create Stream with event time where Data contains an ID, a timestamp and a temperature value

DataStream<Data> oneStream = env.fromElements(

        new Data(123, new Date(116, 8,8,11,11,11), 5),

        new Data(124, new Date(116, 8,8,12,10,11), 8),

        new Data(125, new Date(116, 8,8,12,12,11), 10),

        new Data(126, new Date(116, 8,8,12,15,11), 2))

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Data>() {
    @Override
    public long extractAscendingTimestamp(final Data data) {
        return data.getTimestamp().getTime();
    }
});



//calluclate max value of temperature per hour

DataStream<Data> maxStream = WatermarkStream

        .keyBy("id")

        .timeWindow(Time.hours(1))

        .max("temp");
maxStream.print();




Here are the questions I ran into:

1.)    Why does the resulting stream "maxStream" have to be of type Data? From the documentation and the difference to maxBy I would expect the resulting stream to be of type Integer?


2.)    Executing the code as it is above, I would expect the printed result be the data with ID 124 and ID 125. However, the execution prints all 4 data sets. Did I totally get this example wrong? How would the code need  to look, to get the expected result?


3.)    Can you point me to a good example for using time windows and aggregates? I couldn't find one that explains the above questions.

Thanks for your help and best wishes,
Claudia



AW: max aggregator dosen't work as expected

Posted by Claudia Wegmann <c....@kasasi.de>.
So if I change the input data to (I added an uID value to identify the single data sets):

        new Data(1, 123, new Date(116, 8,8,11,11,11), 5),

        new Data(2, 123, new Date(116, 8,8,12,10,11), 8),

        new Data(3, 123, new Date(116, 8,8,12,12,11), 10),

        new Data(4, 123, new Date(116, 8,8,12,15,11), 2))

the expected output for maxBy would be:

       Data(1, 123, Date(116, 8,8,12,10,11), 8),

       Data(3, 123, Date(116, 8,8,12,12,11), 10),
and for max would be:

       Data(null, null, null, 8),

       Data(null, null, null, 10),
?

My output from Flink is:
1> Data: uID-1|| ID-123 || timestamp-Thu Sep 08 11:11:11 CEST 2016 || temperatur-5
1> Data: uID-2|| ID-123 || timestamp-Thu Sep 08 12:10:11 CEST 2016 || temperatur-10

Here are the new questions:

1.)    So, does max not work as expected?


2.)    And doesn’t the time window stat with the first incoming record? Obviously first and second data set are not in the same window. When does the time window start?


3.)    Why does the second record in the result have uID = 2 although it had uID = 3 in the input data set?

Thanks again and best,
Claudia

Von: Robert Metzger [mailto:rmetzger@apache.org]
Gesendet: Montag, 8. August 2016 15:52
An: user@flink.apache.org
Betreff: Re: max aggregator dosen't work as expected

I have to admit that the difference between the two methods is subtle, and in my opinion it doesn't make much sense to have the two variants.

- max() returns a tuple with the max value at the specified position, the other fields of the tuple/pojo are undefined
- maxBy() returns a tuple with the max value at the specified position (the other fields are retained)

On Mon, Aug 8, 2016 at 2:55 PM, Claudia Wegmann <c....@kasasi.de>> wrote:
OK, found my mistake reagarding question 2.). I key by the id value and gave all the data sets different values there. So of course all 4 data sets are printed. Sorry ☺
But question 1.) still remains.

Von: Claudia Wegmann [mailto:c.wegmann@kasasi.de<ma...@kasasi.de>]
Gesendet: Montag, 8. August 2016 14:27
An: user@flink.apache.org<ma...@flink.apache.org>
Betreff: max aggregator dosen't work as expected

Hey,

I have some questions to aggregate functions such as max or min. Take the following example:

//create Stream with event time where Data contains an ID, a timestamp and a temperature value

DataStream<Data> oneStream = env.fromElements(

        new Data(123, new Date(116, 8,8,11,11,11), 5),

        new Data(124, new Date(116, 8,8,12,10,11), 8),

        new Data(125, new Date(116, 8,8,12,12,11), 10),

        new Data(126, new Date(116, 8,8,12,15,11), 2))

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Data>() {
    @Override
    public long extractAscendingTimestamp(final Data data) {
        return data.getTimestamp().getTime();
    }
});



//calluclate max value of temperature per hour

DataStream<Data> maxStream = WatermarkStream

        .keyBy("id")

        .timeWindow(Time.hours(1))

        .max("temp");
maxStream.print();




Here are the questions I ran into:

1.)    Why does the resulting stream “maxStream” have to be of type Data? From the documentation and the difference to maxBy I would expect the resulting stream to be of type Integer?

2.)    Executing the code as it is above, I would expect the printed result be the data with ID 124 and ID 125. However, the execution prints all 4 data sets. Did I totally get this example wrong? How would the code need  to look, to get the expected result?

3.)    Can you point me to a good example for using time windows and aggregates? I couldn’t find one that explains the above questions.

Thanks for your help and best wishes,
Claudia





Re: max aggregator dosen't work as expected

Posted by Robert Metzger <rm...@apache.org>.
I have to admit that the difference between the two methods is subtle, and
in my opinion it doesn't make much sense to have the two variants.

- max() returns a tuple with the max value at the specified position, the
other fields of the tuple/pojo are undefined
- maxBy() returns a tuple with the max value at the specified position (the
other fields are retained)

On Mon, Aug 8, 2016 at 2:55 PM, Claudia Wegmann <c....@kasasi.de> wrote:

> OK, found my mistake reagarding question 2.). I key by the id value and
> gave all the data sets different values there. So of course all 4 data sets
> are printed. Sorry J
>
> But question 1.) still remains.
>
>
>
> *Von:* Claudia Wegmann [mailto:c.wegmann@kasasi.de]
> *Gesendet:* Montag, 8. August 2016 14:27
> *An:* user@flink.apache.org
> *Betreff:* max aggregator dosen't work as expected
>
>
>
> Hey,
>
>
>
> I have some questions to aggregate functions such as max or min. Take the
> following example:
>
>
>
> //create Stream with event time where Data contains an ID, a timestamp and
> a temperature value
>
> DataStream<Data> oneStream = env.fromElements(
>
>         *new *Data(123, *new *Date(116, 8,8,11,11,11), 5),
>
>         *new *Data(124, *new *Date(116, 8,8,12,10,11), 8),
>
>         *new *Data(125, *new *Date(116, 8,8,12,12,11), 10),
>
>         *new *Data(126, *new *Date(116, 8,8,12,15,11), 2))
>
> .assignTimestampsAndWatermarks(*new *AscendingTimestampExtractor<Data>() {
>     @Override
>     *public long *extractAscendingTimestamp(*final *Data data) {
>         *return *data.getTimestamp().getTime();
>     }
> });
>
>
>
> //calluclate max value of temperature per hour
>
> DataStream<Data> maxStream = WatermarkStream
>
>         .keyBy(*"id"*)
>
>         .timeWindow(Time.*hours*(1))
>
>         .max(*"temp"*);
> maxStream.print();
>
>
>
>
>
> Here are the questions I ran into:
>
> 1.)    Why does the resulting stream “maxStream” have to be of type Data?
> From the documentation and the difference to maxBy I would expect the
> resulting stream to be of type Integer?
>
> 2.)    Executing the code as it is above, I would expect the printed
> result be the data with ID 124 and ID 125. However, the execution prints
> all 4 data sets. Did I totally get this example wrong? How would the code
> need  to look, to get the expected result?
>
> 3.)    Can you point me to a good example for using time windows and
> aggregates? I couldn’t find one that explains the above questions.
>
>
>
> Thanks for your help and best wishes,
>
> Claudia
>
>
>
>
>
>
>

AW: max aggregator dosen't work as expected

Posted by Claudia Wegmann <c....@kasasi.de>.
OK, found my mistake reagarding question 2.). I key by the id value and gave all the data sets different values there. So of course all 4 data sets are printed. Sorry :)
But question 1.) still remains.

Von: Claudia Wegmann [mailto:c.wegmann@kasasi.de]
Gesendet: Montag, 8. August 2016 14:27
An: user@flink.apache.org
Betreff: max aggregator dosen't work as expected

Hey,

I have some questions to aggregate functions such as max or min. Take the following example:

//create Stream with event time where Data contains an ID, a timestamp and a temperature value

DataStream<Data> oneStream = env.fromElements(

        new Data(123, new Date(116, 8,8,11,11,11), 5),

        new Data(124, new Date(116, 8,8,12,10,11), 8),

        new Data(125, new Date(116, 8,8,12,12,11), 10),

        new Data(126, new Date(116, 8,8,12,15,11), 2))

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Data>() {
    @Override
    public long extractAscendingTimestamp(final Data data) {
        return data.getTimestamp().getTime();
    }
});



//calluclate max value of temperature per hour

DataStream<Data> maxStream = WatermarkStream

        .keyBy("id")

        .timeWindow(Time.hours(1))

        .max("temp");
maxStream.print();




Here are the questions I ran into:

1.)    Why does the resulting stream "maxStream" have to be of type Data? From the documentation and the difference to maxBy I would expect the resulting stream to be of type Integer?

2.)    Executing the code as it is above, I would expect the printed result be the data with ID 124 and ID 125. However, the execution prints all 4 data sets. Did I totally get this example wrong? How would the code need  to look, to get the expected result?

3.)    Can you point me to a good example for using time windows and aggregates? I couldn't find one that explains the above questions.

Thanks for your help and best wishes,
Claudia