You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Plamen Paskov <pl...@next-stream.com> on 2017/12/15 10:55:21 UTC

consecutive stream aggregations

Hi,

I'm trying to calculate the running average of session length and i want 
to trigger the computation on a regular let's say 2 minutes interval. 
I'm trying to do it like this:

package flink;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.TimeZone;


public class StreamingJob {
     public static void main(String[] args)throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

         SingleOutputStreamOperator<Event> sessions = env
                 .socketTextStream("localhost",9000,"\n")
                 .map(new MapFunction<String, Event>() {
                     @Override public Event map(String value)throws Exception {
                         String[] row = value.split(",");
                         return new Event(Long.valueOf(row[0]), row[1], Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
                     }
                 })
                 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
                     @Override public long extractTimestamp(Event element) {
                         return element.timestamp;
                     }
                 })
                 .keyBy("userId","sessionId")
                 .maxBy("length");


         sessions
                 .timeWindowAll(Time.seconds(60), Time.seconds(30))
                 .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
                     @Override public void apply(TimeWindow window, Iterable<Event> values, Collector<Avg> out)throws Exception {
                         long sum =0;
                         int count =0;

                         for (Event event : values) {
                             sum += event.length;
                             count++;
                         }

                         double avg = sum / count;
                         LocalDateTime windowStart = LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()), TimeZone.getDefault().toZoneId());
                         LocalDateTime windowEnd = LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()), TimeZone.getDefault().toZoneId());
                         out.collect(new Avg(avg, windowStart.toString(), windowEnd.toString()));
                     }
                 });

         env.execute();
     }

     @AllArgsConstructor @NoArgsConstructor @ToString public static class Avg {
         public double length;
         public StringwindowStart;
         public StringwindowEnd;
     }

     @AllArgsConstructor @NoArgsConstructor @ToString public static class Event {
         public long userId;
         public StringsessionId;
         public long length;
         public long timestamp;
     }
}

First i want to extract the last session event for every user-session because it contains the total session length. Then i want to calculate the average session length based on the data from
previous operation (based on the sessions variable).

Example:

1,s1,100,2017-12-13 11:58:01
1,s1,150,2017-12-13 11:58:02
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04

sessions variable should contain those rows:
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04

but it's returning the max length row only for the corresponding event.

Questions:
- how to collect the data for all groups in sessions variable?
- is there another way to achieve this functionality because with my implementation the average will be computed on single node because sessions is of type SingleOutputStreamOperator<Event>
- can i use ContinuousEventTimeTrigger to trigger at regular intervals ?

Thanks


Re: consecutive stream aggregations

Posted by Ufuk Celebi <uc...@apache.org>.
You have to specify a window for this to work:

stream
  .keyBy(<key>)
  .timeWindow(<time>)
  .aggregate(<aggregateFunction>)



On Fri, Dec 15, 2017 at 3:04 PM, Plamen Paskov
<pl...@next-stream.com> wrote:
> Hi Ufuk,
>
> Thanks for answer. It looks like in theory the accumulators are the solution
> to my problem but as i'm working on KeyedStream it's not possible to call
> aggregate with AggregateFunction implementation. Am i missing something?
>
>
>
> On 15.12.2017 15:46, Ufuk Celebi wrote:
>>
>> Hey Plamen,
>>
>> I think what you are looking for is the AggregateFunction. This you
>> can use on keyed streams. The Javadoc [1] contains an example for your
>> use case (averaging).
>>
>> – Ufuk
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>
>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
>> <pl...@next-stream.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to calculate the running average of session length and i want
>>> to
>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>> trying to do it like this:
>>>
>>> package flink;
>>>
>>> import lombok.AllArgsConstructor;
>>> import lombok.NoArgsConstructor;
>>> import lombok.ToString;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import
>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>>
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>> import
>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>>> import org.apache.flink.util.Collector;
>>>
>>> import java.sql.Timestamp;
>>> import java.time.Instant;
>>> import java.time.LocalDateTime;
>>> import java.util.TimeZone;
>>>
>>>
>>> public class StreamingJob {
>>>      public static void main(String[] args) throws Exception {
>>>          StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>>          SingleOutputStreamOperator<Event> sessions = env
>>>                  .socketTextStream("localhost", 9000, "\n")
>>>                  .map(new MapFunction<String, Event>() {
>>>                      @Override
>>>                      public Event map(String value) throws Exception {
>>>                          String[] row = value.split(",");
>>>                          return new Event(Long.valueOf(row[0]), row[1],
>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>                      }
>>>                  })
>>>                  .assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>                      @Override
>>>                      public long extractTimestamp(Event element) {
>>>                          return element.timestamp;
>>>                      }
>>>                  })
>>>                  .keyBy("userId", "sessionId")
>>>                  .maxBy("length");
>>>
>>>
>>>          sessions
>>>                  .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>>                  .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
>>>                      @Override
>>>                      public void apply(TimeWindow window, Iterable<Event>
>>> values, Collector<Avg> out) throws Exception {
>>>                          long sum = 0;
>>>                          int count = 0;
>>>
>>>                          for (Event event : values) {
>>>                              sum += event.length;
>>>                              count++;
>>>                          }
>>>
>>>                          double avg = sum / count;
>>>                          LocalDateTime windowStart =
>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>>> TimeZone.getDefault().toZoneId());
>>>                          LocalDateTime windowEnd =
>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>>> TimeZone.getDefault().toZoneId());
>>>                          out.collect(new Avg(avg, windowStart.toString(),
>>> windowEnd.toString()));
>>>                      }
>>>                  });
>>>
>>>          env.execute();
>>>      }
>>>
>>>      @AllArgsConstructor
>>>      @NoArgsConstructor
>>>      @ToString
>>>      public static class Avg {
>>>          public double length;
>>>          public String windowStart;
>>>          public String windowEnd;
>>>      }
>>>
>>>      @AllArgsConstructor
>>>      @NoArgsConstructor
>>>      @ToString
>>>      public static class Event {
>>>          public long userId;
>>>          public String sessionId;
>>>          public long length;
>>>          public long timestamp;
>>>      }
>>> }
>>>
>>> First i want to extract the last session event for every user-session
>>> because it contains the total session length. Then i want to calculate
>>> the
>>> average session length based on the data from
>>> previous operation (based on the sessions variable).
>>>
>>> Example:
>>>
>>> 1,s1,100,2017-12-13 11:58:01
>>> 1,s1,150,2017-12-13 11:58:02
>>> 1,s1,160,2017-12-13 11:58:03
>>> 2,s1,100,2017-12-13 11:58:04
>>>
>>> sessions variable should contain those rows:
>>> 1,s1,160,2017-12-13 11:58:03
>>> 2,s1,100,2017-12-13 11:58:04
>>>
>>> but it's returning the max length row only for the corresponding event.
>>>
>>> Questions:
>>> - how to collect the data for all groups in sessions variable?
>>> - is there another way to achieve this functionality because with my
>>> implementation the average will be computed on single node because
>>> sessions
>>> is of type SingleOutputStreamOperator<Event>
>>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
>>>
>>> Thanks
>
>

Re: consecutive stream aggregations

Posted by Plamen Paskov <pl...@next-stream.com>.
Hi Ufuk,

Thanks for answer. It looks like in theory the accumulators are the 
solution to my problem but as i'm working on KeyedStream it's not 
possible to call aggregate with AggregateFunction implementation. Am i 
missing something?


On 15.12.2017 15:46, Ufuk Celebi wrote:
> Hey Plamen,
>
> I think what you are looking for is the AggregateFunction. This you
> can use on keyed streams. The Javadoc [1] contains an example for your
> use case (averaging).
>
> – Ufuk
>
> [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>
> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
> <pl...@next-stream.com> wrote:
>> Hi,
>>
>> I'm trying to calculate the running average of session length and i want to
>> trigger the computation on a regular let's say 2 minutes interval. I'm
>> trying to do it like this:
>>
>> package flink;
>>
>> import lombok.AllArgsConstructor;
>> import lombok.NoArgsConstructor;
>> import lombok.ToString;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>> import org.apache.flink.util.Collector;
>>
>> import java.sql.Timestamp;
>> import java.time.Instant;
>> import java.time.LocalDateTime;
>> import java.util.TimeZone;
>>
>>
>> public class StreamingJob {
>>      public static void main(String[] args) throws Exception {
>>          StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>          SingleOutputStreamOperator<Event> sessions = env
>>                  .socketTextStream("localhost", 9000, "\n")
>>                  .map(new MapFunction<String, Event>() {
>>                      @Override
>>                      public Event map(String value) throws Exception {
>>                          String[] row = value.split(",");
>>                          return new Event(Long.valueOf(row[0]), row[1],
>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>                      }
>>                  })
>>                  .assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>                      @Override
>>                      public long extractTimestamp(Event element) {
>>                          return element.timestamp;
>>                      }
>>                  })
>>                  .keyBy("userId", "sessionId")
>>                  .maxBy("length");
>>
>>
>>          sessions
>>                  .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>                  .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
>>                      @Override
>>                      public void apply(TimeWindow window, Iterable<Event>
>> values, Collector<Avg> out) throws Exception {
>>                          long sum = 0;
>>                          int count = 0;
>>
>>                          for (Event event : values) {
>>                              sum += event.length;
>>                              count++;
>>                          }
>>
>>                          double avg = sum / count;
>>                          LocalDateTime windowStart =
>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>> TimeZone.getDefault().toZoneId());
>>                          LocalDateTime windowEnd =
>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>> TimeZone.getDefault().toZoneId());
>>                          out.collect(new Avg(avg, windowStart.toString(),
>> windowEnd.toString()));
>>                      }
>>                  });
>>
>>          env.execute();
>>      }
>>
>>      @AllArgsConstructor
>>      @NoArgsConstructor
>>      @ToString
>>      public static class Avg {
>>          public double length;
>>          public String windowStart;
>>          public String windowEnd;
>>      }
>>
>>      @AllArgsConstructor
>>      @NoArgsConstructor
>>      @ToString
>>      public static class Event {
>>          public long userId;
>>          public String sessionId;
>>          public long length;
>>          public long timestamp;
>>      }
>> }
>>
>> First i want to extract the last session event for every user-session
>> because it contains the total session length. Then i want to calculate the
>> average session length based on the data from
>> previous operation (based on the sessions variable).
>>
>> Example:
>>
>> 1,s1,100,2017-12-13 11:58:01
>> 1,s1,150,2017-12-13 11:58:02
>> 1,s1,160,2017-12-13 11:58:03
>> 2,s1,100,2017-12-13 11:58:04
>>
>> sessions variable should contain those rows:
>> 1,s1,160,2017-12-13 11:58:03
>> 2,s1,100,2017-12-13 11:58:04
>>
>> but it's returning the max length row only for the corresponding event.
>>
>> Questions:
>> - how to collect the data for all groups in sessions variable?
>> - is there another way to achieve this functionality because with my
>> implementation the average will be computed on single node because sessions
>> is of type SingleOutputStreamOperator<Event>
>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
>>
>> Thanks


Re: consecutive stream aggregations

Posted by Ufuk Celebi <uc...@apache.org>.
You can do this by first doing a keyBy userId and then emitting the
value you want to average (session length). The output of this you
feed into the aggregateFunction that does a grouping by time and emits
the average per time.

input.keyBy(user).flatMap(extractSessionLength()).timeWindowAll(time).aggregate(averageAggregate())

TimeWindowAll is a single parallelism (no parallelism) operator, but
that is fine as long as you don't have huge throughput requirements.
If that becomes a problem, we would have to think about
pre-aggregating in parallel.

Does this help?

– Ufuk


On Fri, Dec 15, 2017 at 4:56 PM, Plamen Paskov
<pl...@next-stream.com> wrote:
> In my case i have a lot of users with one session per user. What i'm
> thinking is to evenly distribute the users then accumulate and finally merge
> all accumulators. The problem is that i don't know how to achieve this.
>
>
>
> On 15.12.2017 17:52, Ufuk Celebi wrote:
>>
>> You can first aggregate the length per user and emit it downstream.
>> Then you do the all window and average all lengths. Does that make
>> sense?
>>
>> On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov
>> <pl...@next-stream.com> wrote:
>>>
>>> I think i got your point.
>>> What happens now: in order to use aggregate() i need an window but the
>>> window requires keyBy() if i want to parallelize the data. In my case it
>>> will not work because if i create keyBy("userId") then the average
>>> will be calculated per userId  but i want average across all users. What
>>> would be the solution in this case?
>>>
>>> Thanks
>>>
>>>
>>> On 15.12.2017 15:46, Ufuk Celebi wrote:
>>>>
>>>> Hey Plamen,
>>>>
>>>> I think what you are looking for is the AggregateFunction. This you
>>>> can use on keyed streams. The Javadoc [1] contains an example for your
>>>> use case (averaging).
>>>>
>>>> – Ufuk
>>>>
>>>> [1]
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>>>
>>>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
>>>> <pl...@next-stream.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to calculate the running average of session length and i
>>>>> want
>>>>> to
>>>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>>>> trying to do it like this:
>>>>>
>>>>> package flink;
>>>>>
>>>>> import lombok.AllArgsConstructor;
>>>>> import lombok.NoArgsConstructor;
>>>>> import lombok.ToString;
>>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>>> import
>>>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>>>> import
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import
>>>>>
>>>>>
>>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>>>> import
>>>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>>>>> import org.apache.flink.util.Collector;
>>>>>
>>>>> import java.sql.Timestamp;
>>>>> import java.time.Instant;
>>>>> import java.time.LocalDateTime;
>>>>> import java.util.TimeZone;
>>>>>
>>>>>
>>>>> public class StreamingJob {
>>>>>       public static void main(String[] args) throws Exception {
>>>>>           StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>
>>>>>           SingleOutputStreamOperator<Event> sessions = env
>>>>>                   .socketTextStream("localhost", 9000, "\n")
>>>>>                   .map(new MapFunction<String, Event>() {
>>>>>                       @Override
>>>>>                       public Event map(String value) throws Exception {
>>>>>                           String[] row = value.split(",");
>>>>>                           return new Event(Long.valueOf(row[0]),
>>>>> row[1],
>>>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>>>                       }
>>>>>                   })
>>>>>                   .assignTimestampsAndWatermarks(new
>>>>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>>>                       @Override
>>>>>                       public long extractTimestamp(Event element) {
>>>>>                           return element.timestamp;
>>>>>                       }
>>>>>                   })
>>>>>                   .keyBy("userId", "sessionId")
>>>>>                   .maxBy("length");
>>>>>
>>>>>
>>>>>           sessions
>>>>>                   .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>>>>                   .apply(new AllWindowFunction<Event, Avg,
>>>>> TimeWindow>() {
>>>>>                       @Override
>>>>>                       public void apply(TimeWindow window,
>>>>> Iterable<Event>
>>>>> values, Collector<Avg> out) throws Exception {
>>>>>                           long sum = 0;
>>>>>                           int count = 0;
>>>>>
>>>>>                           for (Event event : values) {
>>>>>                               sum += event.length;
>>>>>                               count++;
>>>>>                           }
>>>>>
>>>>>                           double avg = sum / count;
>>>>>                           LocalDateTime windowStart =
>>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>>>>> TimeZone.getDefault().toZoneId());
>>>>>                           LocalDateTime windowEnd =
>>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>>>>> TimeZone.getDefault().toZoneId());
>>>>>                           out.collect(new Avg(avg,
>>>>> windowStart.toString(),
>>>>> windowEnd.toString()));
>>>>>                       }
>>>>>                   });
>>>>>
>>>>>           env.execute();
>>>>>       }
>>>>>
>>>>>       @AllArgsConstructor
>>>>>       @NoArgsConstructor
>>>>>       @ToString
>>>>>       public static class Avg {
>>>>>           public double length;
>>>>>           public String windowStart;
>>>>>           public String windowEnd;
>>>>>       }
>>>>>
>>>>>       @AllArgsConstructor
>>>>>       @NoArgsConstructor
>>>>>       @ToString
>>>>>       public static class Event {
>>>>>           public long userId;
>>>>>           public String sessionId;
>>>>>           public long length;
>>>>>           public long timestamp;
>>>>>       }
>>>>> }
>>>>>
>>>>> First i want to extract the last session event for every user-session
>>>>> because it contains the total session length. Then i want to calculate
>>>>> the
>>>>> average session length based on the data from
>>>>> previous operation (based on the sessions variable).
>>>>>
>>>>> Example:
>>>>>
>>>>> 1,s1,100,2017-12-13 11:58:01
>>>>> 1,s1,150,2017-12-13 11:58:02
>>>>> 1,s1,160,2017-12-13 11:58:03
>>>>> 2,s1,100,2017-12-13 11:58:04
>>>>>
>>>>> sessions variable should contain those rows:
>>>>> 1,s1,160,2017-12-13 11:58:03
>>>>> 2,s1,100,2017-12-13 11:58:04
>>>>>
>>>>> but it's returning the max length row only for the corresponding event.
>>>>>
>>>>> Questions:
>>>>> - how to collect the data for all groups in sessions variable?
>>>>> - is there another way to achieve this functionality because with my
>>>>> implementation the average will be computed on single node because
>>>>> sessions
>>>>> is of type SingleOutputStreamOperator<Event>
>>>>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals
>>>>> ?
>>>>>
>>>>> Thanks
>>>
>>>
>

Re: consecutive stream aggregations

Posted by Plamen Paskov <pl...@next-stream.com>.
In my case i have a lot of users with one session per user. What i'm 
thinking is to evenly distribute the users then accumulate and finally 
merge all accumulators. The problem is that i don't know how to achieve 
this.


On 15.12.2017 17:52, Ufuk Celebi wrote:
> You can first aggregate the length per user and emit it downstream.
> Then you do the all window and average all lengths. Does that make
> sense?
>
> On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov
> <pl...@next-stream.com> wrote:
>> I think i got your point.
>> What happens now: in order to use aggregate() i need an window but the
>> window requires keyBy() if i want to parallelize the data. In my case it
>> will not work because if i create keyBy("userId") then the average
>> will be calculated per userId  but i want average across all users. What
>> would be the solution in this case?
>>
>> Thanks
>>
>>
>> On 15.12.2017 15:46, Ufuk Celebi wrote:
>>> Hey Plamen,
>>>
>>> I think what you are looking for is the AggregateFunction. This you
>>> can use on keyed streams. The Javadoc [1] contains an example for your
>>> use case (averaging).
>>>
>>> – Ufuk
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>>
>>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
>>> <pl...@next-stream.com> wrote:
>>>> Hi,
>>>>
>>>> I'm trying to calculate the running average of session length and i want
>>>> to
>>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>>> trying to do it like this:
>>>>
>>>> package flink;
>>>>
>>>> import lombok.AllArgsConstructor;
>>>> import lombok.NoArgsConstructor;
>>>> import lombok.ToString;
>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>> import
>>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>>> import
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import
>>>>
>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>>> import
>>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>>>> import org.apache.flink.util.Collector;
>>>>
>>>> import java.sql.Timestamp;
>>>> import java.time.Instant;
>>>> import java.time.LocalDateTime;
>>>> import java.util.TimeZone;
>>>>
>>>>
>>>> public class StreamingJob {
>>>>       public static void main(String[] args) throws Exception {
>>>>           StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>
>>>>           SingleOutputStreamOperator<Event> sessions = env
>>>>                   .socketTextStream("localhost", 9000, "\n")
>>>>                   .map(new MapFunction<String, Event>() {
>>>>                       @Override
>>>>                       public Event map(String value) throws Exception {
>>>>                           String[] row = value.split(",");
>>>>                           return new Event(Long.valueOf(row[0]), row[1],
>>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>>                       }
>>>>                   })
>>>>                   .assignTimestampsAndWatermarks(new
>>>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>>                       @Override
>>>>                       public long extractTimestamp(Event element) {
>>>>                           return element.timestamp;
>>>>                       }
>>>>                   })
>>>>                   .keyBy("userId", "sessionId")
>>>>                   .maxBy("length");
>>>>
>>>>
>>>>           sessions
>>>>                   .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>>>                   .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
>>>>                       @Override
>>>>                       public void apply(TimeWindow window, Iterable<Event>
>>>> values, Collector<Avg> out) throws Exception {
>>>>                           long sum = 0;
>>>>                           int count = 0;
>>>>
>>>>                           for (Event event : values) {
>>>>                               sum += event.length;
>>>>                               count++;
>>>>                           }
>>>>
>>>>                           double avg = sum / count;
>>>>                           LocalDateTime windowStart =
>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>>>> TimeZone.getDefault().toZoneId());
>>>>                           LocalDateTime windowEnd =
>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>>>> TimeZone.getDefault().toZoneId());
>>>>                           out.collect(new Avg(avg, windowStart.toString(),
>>>> windowEnd.toString()));
>>>>                       }
>>>>                   });
>>>>
>>>>           env.execute();
>>>>       }
>>>>
>>>>       @AllArgsConstructor
>>>>       @NoArgsConstructor
>>>>       @ToString
>>>>       public static class Avg {
>>>>           public double length;
>>>>           public String windowStart;
>>>>           public String windowEnd;
>>>>       }
>>>>
>>>>       @AllArgsConstructor
>>>>       @NoArgsConstructor
>>>>       @ToString
>>>>       public static class Event {
>>>>           public long userId;
>>>>           public String sessionId;
>>>>           public long length;
>>>>           public long timestamp;
>>>>       }
>>>> }
>>>>
>>>> First i want to extract the last session event for every user-session
>>>> because it contains the total session length. Then i want to calculate
>>>> the
>>>> average session length based on the data from
>>>> previous operation (based on the sessions variable).
>>>>
>>>> Example:
>>>>
>>>> 1,s1,100,2017-12-13 11:58:01
>>>> 1,s1,150,2017-12-13 11:58:02
>>>> 1,s1,160,2017-12-13 11:58:03
>>>> 2,s1,100,2017-12-13 11:58:04
>>>>
>>>> sessions variable should contain those rows:
>>>> 1,s1,160,2017-12-13 11:58:03
>>>> 2,s1,100,2017-12-13 11:58:04
>>>>
>>>> but it's returning the max length row only for the corresponding event.
>>>>
>>>> Questions:
>>>> - how to collect the data for all groups in sessions variable?
>>>> - is there another way to achieve this functionality because with my
>>>> implementation the average will be computed on single node because
>>>> sessions
>>>> is of type SingleOutputStreamOperator<Event>
>>>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
>>>>
>>>> Thanks
>>


Re: consecutive stream aggregations

Posted by Ufuk Celebi <uc...@apache.org>.
You can first aggregate the length per user and emit it downstream.
Then you do the all window and average all lengths. Does that make
sense?

On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov
<pl...@next-stream.com> wrote:
> I think i got your point.
> What happens now: in order to use aggregate() i need an window but the
> window requires keyBy() if i want to parallelize the data. In my case it
> will not work because if i create keyBy("userId") then the average
> will be calculated per userId  but i want average across all users. What
> would be the solution in this case?
>
> Thanks
>
>
> On 15.12.2017 15:46, Ufuk Celebi wrote:
>>
>> Hey Plamen,
>>
>> I think what you are looking for is the AggregateFunction. This you
>> can use on keyed streams. The Javadoc [1] contains an example for your
>> use case (averaging).
>>
>> – Ufuk
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>
>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
>> <pl...@next-stream.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to calculate the running average of session length and i want
>>> to
>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>> trying to do it like this:
>>>
>>> package flink;
>>>
>>> import lombok.AllArgsConstructor;
>>> import lombok.NoArgsConstructor;
>>> import lombok.ToString;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import
>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>>
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>> import
>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>>> import org.apache.flink.util.Collector;
>>>
>>> import java.sql.Timestamp;
>>> import java.time.Instant;
>>> import java.time.LocalDateTime;
>>> import java.util.TimeZone;
>>>
>>>
>>> public class StreamingJob {
>>>      public static void main(String[] args) throws Exception {
>>>          StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>>          SingleOutputStreamOperator<Event> sessions = env
>>>                  .socketTextStream("localhost", 9000, "\n")
>>>                  .map(new MapFunction<String, Event>() {
>>>                      @Override
>>>                      public Event map(String value) throws Exception {
>>>                          String[] row = value.split(",");
>>>                          return new Event(Long.valueOf(row[0]), row[1],
>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>                      }
>>>                  })
>>>                  .assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>                      @Override
>>>                      public long extractTimestamp(Event element) {
>>>                          return element.timestamp;
>>>                      }
>>>                  })
>>>                  .keyBy("userId", "sessionId")
>>>                  .maxBy("length");
>>>
>>>
>>>          sessions
>>>                  .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>>                  .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
>>>                      @Override
>>>                      public void apply(TimeWindow window, Iterable<Event>
>>> values, Collector<Avg> out) throws Exception {
>>>                          long sum = 0;
>>>                          int count = 0;
>>>
>>>                          for (Event event : values) {
>>>                              sum += event.length;
>>>                              count++;
>>>                          }
>>>
>>>                          double avg = sum / count;
>>>                          LocalDateTime windowStart =
>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>>> TimeZone.getDefault().toZoneId());
>>>                          LocalDateTime windowEnd =
>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>>> TimeZone.getDefault().toZoneId());
>>>                          out.collect(new Avg(avg, windowStart.toString(),
>>> windowEnd.toString()));
>>>                      }
>>>                  });
>>>
>>>          env.execute();
>>>      }
>>>
>>>      @AllArgsConstructor
>>>      @NoArgsConstructor
>>>      @ToString
>>>      public static class Avg {
>>>          public double length;
>>>          public String windowStart;
>>>          public String windowEnd;
>>>      }
>>>
>>>      @AllArgsConstructor
>>>      @NoArgsConstructor
>>>      @ToString
>>>      public static class Event {
>>>          public long userId;
>>>          public String sessionId;
>>>          public long length;
>>>          public long timestamp;
>>>      }
>>> }
>>>
>>> First i want to extract the last session event for every user-session
>>> because it contains the total session length. Then i want to calculate
>>> the
>>> average session length based on the data from
>>> previous operation (based on the sessions variable).
>>>
>>> Example:
>>>
>>> 1,s1,100,2017-12-13 11:58:01
>>> 1,s1,150,2017-12-13 11:58:02
>>> 1,s1,160,2017-12-13 11:58:03
>>> 2,s1,100,2017-12-13 11:58:04
>>>
>>> sessions variable should contain those rows:
>>> 1,s1,160,2017-12-13 11:58:03
>>> 2,s1,100,2017-12-13 11:58:04
>>>
>>> but it's returning the max length row only for the corresponding event.
>>>
>>> Questions:
>>> - how to collect the data for all groups in sessions variable?
>>> - is there another way to achieve this functionality because with my
>>> implementation the average will be computed on single node because
>>> sessions
>>> is of type SingleOutputStreamOperator<Event>
>>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
>>>
>>> Thanks
>
>

Re: consecutive stream aggregations

Posted by Plamen Paskov <pl...@next-stream.com>.
I think i got your point.
What happens now: in order to use aggregate() i need an window but the 
window requires keyBy() if i want to parallelize the data. In my case it 
will not work because if i create keyBy("userId") then the average
will be calculated per userId  but i want average across all users. What 
would be the solution in this case?

Thanks


On 15.12.2017 15:46, Ufuk Celebi wrote:
> Hey Plamen,
>
> I think what you are looking for is the AggregateFunction. This you
> can use on keyed streams. The Javadoc [1] contains an example for your
> use case (averaging).
>
> – Ufuk
>
> [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>
> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
> <pl...@next-stream.com> wrote:
>> Hi,
>>
>> I'm trying to calculate the running average of session length and i want to
>> trigger the computation on a regular let's say 2 minutes interval. I'm
>> trying to do it like this:
>>
>> package flink;
>>
>> import lombok.AllArgsConstructor;
>> import lombok.NoArgsConstructor;
>> import lombok.ToString;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>> import org.apache.flink.util.Collector;
>>
>> import java.sql.Timestamp;
>> import java.time.Instant;
>> import java.time.LocalDateTime;
>> import java.util.TimeZone;
>>
>>
>> public class StreamingJob {
>>      public static void main(String[] args) throws Exception {
>>          StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>          SingleOutputStreamOperator<Event> sessions = env
>>                  .socketTextStream("localhost", 9000, "\n")
>>                  .map(new MapFunction<String, Event>() {
>>                      @Override
>>                      public Event map(String value) throws Exception {
>>                          String[] row = value.split(",");
>>                          return new Event(Long.valueOf(row[0]), row[1],
>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>                      }
>>                  })
>>                  .assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>                      @Override
>>                      public long extractTimestamp(Event element) {
>>                          return element.timestamp;
>>                      }
>>                  })
>>                  .keyBy("userId", "sessionId")
>>                  .maxBy("length");
>>
>>
>>          sessions
>>                  .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>                  .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
>>                      @Override
>>                      public void apply(TimeWindow window, Iterable<Event>
>> values, Collector<Avg> out) throws Exception {
>>                          long sum = 0;
>>                          int count = 0;
>>
>>                          for (Event event : values) {
>>                              sum += event.length;
>>                              count++;
>>                          }
>>
>>                          double avg = sum / count;
>>                          LocalDateTime windowStart =
>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>> TimeZone.getDefault().toZoneId());
>>                          LocalDateTime windowEnd =
>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>> TimeZone.getDefault().toZoneId());
>>                          out.collect(new Avg(avg, windowStart.toString(),
>> windowEnd.toString()));
>>                      }
>>                  });
>>
>>          env.execute();
>>      }
>>
>>      @AllArgsConstructor
>>      @NoArgsConstructor
>>      @ToString
>>      public static class Avg {
>>          public double length;
>>          public String windowStart;
>>          public String windowEnd;
>>      }
>>
>>      @AllArgsConstructor
>>      @NoArgsConstructor
>>      @ToString
>>      public static class Event {
>>          public long userId;
>>          public String sessionId;
>>          public long length;
>>          public long timestamp;
>>      }
>> }
>>
>> First i want to extract the last session event for every user-session
>> because it contains the total session length. Then i want to calculate the
>> average session length based on the data from
>> previous operation (based on the sessions variable).
>>
>> Example:
>>
>> 1,s1,100,2017-12-13 11:58:01
>> 1,s1,150,2017-12-13 11:58:02
>> 1,s1,160,2017-12-13 11:58:03
>> 2,s1,100,2017-12-13 11:58:04
>>
>> sessions variable should contain those rows:
>> 1,s1,160,2017-12-13 11:58:03
>> 2,s1,100,2017-12-13 11:58:04
>>
>> but it's returning the max length row only for the corresponding event.
>>
>> Questions:
>> - how to collect the data for all groups in sessions variable?
>> - is there another way to achieve this functionality because with my
>> implementation the average will be computed on single node because sessions
>> is of type SingleOutputStreamOperator<Event>
>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
>>
>> Thanks


Re: consecutive stream aggregations

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Plamen,

I think what you are looking for is the AggregateFunction. This you
can use on keyed streams. The Javadoc [1] contains an example for your
use case (averaging).

– Ufuk

[1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java

On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
<pl...@next-stream.com> wrote:
> Hi,
>
> I'm trying to calculate the running average of session length and i want to
> trigger the computation on a regular let's say 2 minutes interval. I'm
> trying to do it like this:
>
> package flink;
>
> import lombok.AllArgsConstructor;
> import lombok.NoArgsConstructor;
> import lombok.ToString;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
>
> import java.sql.Timestamp;
> import java.time.Instant;
> import java.time.LocalDateTime;
> import java.util.TimeZone;
>
>
> public class StreamingJob {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         SingleOutputStreamOperator<Event> sessions = env
>                 .socketTextStream("localhost", 9000, "\n")
>                 .map(new MapFunction<String, Event>() {
>                     @Override
>                     public Event map(String value) throws Exception {
>                         String[] row = value.split(",");
>                         return new Event(Long.valueOf(row[0]), row[1],
> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>                     }
>                 })
>                 .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>                     @Override
>                     public long extractTimestamp(Event element) {
>                         return element.timestamp;
>                     }
>                 })
>                 .keyBy("userId", "sessionId")
>                 .maxBy("length");
>
>
>         sessions
>                 .timeWindowAll(Time.seconds(60), Time.seconds(30))
>                 .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
>                     @Override
>                     public void apply(TimeWindow window, Iterable<Event>
> values, Collector<Avg> out) throws Exception {
>                         long sum = 0;
>                         int count = 0;
>
>                         for (Event event : values) {
>                             sum += event.length;
>                             count++;
>                         }
>
>                         double avg = sum / count;
>                         LocalDateTime windowStart =
> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
> TimeZone.getDefault().toZoneId());
>                         LocalDateTime windowEnd =
> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
> TimeZone.getDefault().toZoneId());
>                         out.collect(new Avg(avg, windowStart.toString(),
> windowEnd.toString()));
>                     }
>                 });
>
>         env.execute();
>     }
>
>     @AllArgsConstructor
>     @NoArgsConstructor
>     @ToString
>     public static class Avg {
>         public double length;
>         public String windowStart;
>         public String windowEnd;
>     }
>
>     @AllArgsConstructor
>     @NoArgsConstructor
>     @ToString
>     public static class Event {
>         public long userId;
>         public String sessionId;
>         public long length;
>         public long timestamp;
>     }
> }
>
> First i want to extract the last session event for every user-session
> because it contains the total session length. Then i want to calculate the
> average session length based on the data from
> previous operation (based on the sessions variable).
>
> Example:
>
> 1,s1,100,2017-12-13 11:58:01
> 1,s1,150,2017-12-13 11:58:02
> 1,s1,160,2017-12-13 11:58:03
> 2,s1,100,2017-12-13 11:58:04
>
> sessions variable should contain those rows:
> 1,s1,160,2017-12-13 11:58:03
> 2,s1,100,2017-12-13 11:58:04
>
> but it's returning the max length row only for the corresponding event.
>
> Questions:
> - how to collect the data for all groups in sessions variable?
> - is there another way to achieve this functionality because with my
> implementation the average will be computed on single node because sessions
> is of type SingleOutputStreamOperator<Event>
> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
>
> Thanks