You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2015/12/16 15:26:40 UTC

[DISCUSS] Time Behavior in Streaming Jobs (Event-time/processing-time)

Hi,
I thought a bit about how to improve the handling of time in Flink, mostly as it relates to windows. The problem is that mixing processing-time and event-time windows in one topology is very hard (impossible) right now. Let my explain it with this example:

val env: StreamExecutionEnvironment = …

env.setStreamTimeCharacteristic(EventTime)

val input = <some stream>

val quickResults = input
  .keyBy(…)
  .window(TumblingTimeWindows.of(Time.seconds(5))
  .trigger(ProcessingTimeTrigger.create())
  .sum(1)

val slowResults = input
  .keyBy(…)
  .window(TumblingTimeWindows.of(Time.seconds(5))
  // .trigger(EventTimeTrigger.create()) this is the default trigger, so no need to set it, really
  .sum(1)

The idea is that you want to have fast, but possibly inaccurate, results using processing time and correct, but maybe slower, results using event-time windowing.

The problem is that the current API tries to solve two problems:
 1. We want to have a way to just say “time window” and then let the system instantiate the correct window-operator based on the time characteristic
 2. We want to have flexibility to allow users to mix ’n match processing-time and event-time windows

The above example does not work because both operators would assign elements to windows based on the event-time timestamp. The first window therefore triggers event-time windows by processing time, which has unexpected (wrong) results.

I see three solutions to this:
 1. Remove setStreamTimeCharacteristic(), let users always specify exactly what kind of windowing they want
 2. Keep setStreamTimeCharacteristic() but only employ the magic that decides on the window operator for the special .timeWindow() call. Have two different window assigners (two per window type, that is TumblingWindows, SlidingTimeWindows, SessionWindows, ...), one for processing-time and one for event-time that allow users to accurately specify what they want
 3. Keep setStreamTimeCharacteristic() and have three window assigners per window type, one for processing-time, one for event-time and one that automatically decides based on the time characteristic

What do you think?

On a side note, I would also suggest to remove AbstractTime, EventTime, and ProcessingTime and just keep Time for specifying time.

Cheers,
Aljoscha

Re: [DISCUSS] Time Behavior in Streaming Jobs (Event-time/processing-time)

Posted by Aljoscha Krettek <al...@apache.org>.
@Fabian Ah sorry, I misread the second part. I don't know what I thought
there :D

But you are right, having one assigner leaves us in the same pickle that
we're in know. That's why I suggested the other options.

On Fri, 18 Dec 2015 at 15:01 Fabian Hueske <fh...@gmail.com> wrote:

> Nope, I'm +1 for option 2)
>
> Stephan suggested to have a single WindowAssigner implementation that
> receives the TimeCharacteristics as a parameter (if I understood him
> right). If would find that inconsistent with dedicated TimeTrigger
> implementation for event and processing time.
>
> 2015-12-18 14:52 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
> > @Fabian So you would actually be for option 3) of my initial proposals?
> >
> > @Stephan What do you mean by that? Would users set a time characteristic
> > per job or per window assigner in your suggestion?
> >
> > On Fri, 18 Dec 2015 at 12:10 Fabian Hueske <fh...@gmail.com> wrote:
> >
> > > +1 for the second approach.
> > >
> > > Regarding Stephan's comment, I think it would be better to have
> dedicated
> > > WindowAssigner classes. Otherwise, this becomes inconsistent with the
> > > dedicated Event/ProcessingTimeTriggers.
> > >
> > > 2015-12-18 12:03 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > >
> > > > I am also in favor of option (2).
> > > >
> > > > We could also pass the TimeCharacteristic to for example the
> > > > "SlidingTimeWindows". Then there is one class, users can explicitly
> > > choose
> > > > the characteristic of choice, and when nothing is specified, the
> > > > default time characteristic is chosen.
> > > >
> > > > On Thu, Dec 17, 2015 at 11:41 AM, Maximilian Michels <mxm@apache.org
> >
> > > > wrote:
> > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > I'm in favor of option 2: Keep the setStreamTimeCharacteristic to
> set
> > > > > the default time behavior. Then add a method to the operators to
> set
> > a
> > > > > custom time behavior.
> > > > >
> > > > > The problem explanatory in SlidingTimeWindows:
> > > > >
> > > > > @Override
> > > > > public Trigger<Object, TimeWindow>
> > > > > getDefaultTrigger(StreamExecutionEnvironment env) {
> > > > >    if (env.getStreamTimeCharacteristic() ==
> > > > > TimeCharacteristic.ProcessingTime) {
> > > > >       return ProcessingTimeTrigger.create();
> > > > >    } else {
> > > > >       return EventTimeTrigger.create();
> > > > >    }
> > > > > }
> > > > >
> > > > > That just needs to be fixed, e.g. by having a dedicated
> > > > > setTimeCharacteristic(..) on the operator.
> > > > >
> > > > > +1 for removing AbstractTime, EvenTime, and ProcessingTime.
> > > > >
> > > > > Cheers,
> > > > > Max
> > > > >
> > > > > On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > wrote:
> > > > > > Hi,
> > > > > > I thought a bit about how to improve the handling of time in
> Flink,
> > > > > mostly as it relates to windows. The problem is that mixing
> > > > processing-time
> > > > > and event-time windows in one topology is very hard (impossible)
> > right
> > > > now.
> > > > > Let my explain it with this example:
> > > > > >
> > > > > > val env: StreamExecutionEnvironment = …
> > > > > >
> > > > > > env.setStreamTimeCharacteristic(EventTime)
> > > > > >
> > > > > > val input = <some stream>
> > > > > >
> > > > > > val quickResults = input
> > > > > >   .keyBy(…)
> > > > > >   .window(TumblingTimeWindows.of(Time.seconds(5))
> > > > > >   .trigger(ProcessingTimeTrigger.create())
> > > > > >   .sum(1)
> > > > > >
> > > > > > val slowResults = input
> > > > > >   .keyBy(…)
> > > > > >   .window(TumblingTimeWindows.of(Time.seconds(5))
> > > > > >   // .trigger(EventTimeTrigger.create()) this is the default
> > trigger,
> > > > so
> > > > > no need to set it, really
> > > > > >   .sum(1)
> > > > > >
> > > > > > The idea is that you want to have fast, but possibly inaccurate,
> > > > results
> > > > > using processing time and correct, but maybe slower, results using
> > > > > event-time windowing.
> > > > > >
> > > > > > The problem is that the current API tries to solve two problems:
> > > > > >  1. We want to have a way to just say “time window” and then let
> > the
> > > > > system instantiate the correct window-operator based on the time
> > > > > characteristic
> > > > > >  2. We want to have flexibility to allow users to mix ’n match
> > > > > processing-time and event-time windows
> > > > > >
> > > > > > The above example does not work because both operators would
> assign
> > > > > elements to windows based on the event-time timestamp. The first
> > window
> > > > > therefore triggers event-time windows by processing time, which has
> > > > > unexpected (wrong) results.
> > > > > >
> > > > > > I see three solutions to this:
> > > > > >  1. Remove setStreamTimeCharacteristic(), let users always
> specify
> > > > > exactly what kind of windowing they want
> > > > > >  2. Keep setStreamTimeCharacteristic() but only employ the magic
> > that
> > > > > decides on the window operator for the special .timeWindow() call.
> > Have
> > > > two
> > > > > different window assigners (two per window type, that is
> > > TumblingWindows,
> > > > > SlidingTimeWindows, SessionWindows, ...), one for processing-time
> and
> > > one
> > > > > for event-time that allow users to accurately specify what they
> want
> > > > > >  3. Keep setStreamTimeCharacteristic() and have three window
> > > assigners
> > > > > per window type, one for processing-time, one for event-time and
> one
> > > that
> > > > > automatically decides based on the time characteristic
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > On a side note, I would also suggest to remove AbstractTime,
> > > EventTime,
> > > > > and ProcessingTime and just keep Time for specifying time.
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Time Behavior in Streaming Jobs (Event-time/processing-time)

Posted by Fabian Hueske <fh...@gmail.com>.
Nope, I'm +1 for option 2)

Stephan suggested to have a single WindowAssigner implementation that
receives the TimeCharacteristics as a parameter (if I understood him
right). If would find that inconsistent with dedicated TimeTrigger
implementation for event and processing time.

2015-12-18 14:52 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> @Fabian So you would actually be for option 3) of my initial proposals?
>
> @Stephan What do you mean by that? Would users set a time characteristic
> per job or per window assigner in your suggestion?
>
> On Fri, 18 Dec 2015 at 12:10 Fabian Hueske <fh...@gmail.com> wrote:
>
> > +1 for the second approach.
> >
> > Regarding Stephan's comment, I think it would be better to have dedicated
> > WindowAssigner classes. Otherwise, this becomes inconsistent with the
> > dedicated Event/ProcessingTimeTriggers.
> >
> > 2015-12-18 12:03 GMT+01:00 Stephan Ewen <se...@apache.org>:
> >
> > > I am also in favor of option (2).
> > >
> > > We could also pass the TimeCharacteristic to for example the
> > > "SlidingTimeWindows". Then there is one class, users can explicitly
> > choose
> > > the characteristic of choice, and when nothing is specified, the
> > > default time characteristic is chosen.
> > >
> > > On Thu, Dec 17, 2015 at 11:41 AM, Maximilian Michels <mx...@apache.org>
> > > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > I'm in favor of option 2: Keep the setStreamTimeCharacteristic to set
> > > > the default time behavior. Then add a method to the operators to set
> a
> > > > custom time behavior.
> > > >
> > > > The problem explanatory in SlidingTimeWindows:
> > > >
> > > > @Override
> > > > public Trigger<Object, TimeWindow>
> > > > getDefaultTrigger(StreamExecutionEnvironment env) {
> > > >    if (env.getStreamTimeCharacteristic() ==
> > > > TimeCharacteristic.ProcessingTime) {
> > > >       return ProcessingTimeTrigger.create();
> > > >    } else {
> > > >       return EventTimeTrigger.create();
> > > >    }
> > > > }
> > > >
> > > > That just needs to be fixed, e.g. by having a dedicated
> > > > setTimeCharacteristic(..) on the operator.
> > > >
> > > > +1 for removing AbstractTime, EvenTime, and ProcessingTime.
> > > >
> > > > Cheers,
> > > > Max
> > > >
> > > > On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > > > Hi,
> > > > > I thought a bit about how to improve the handling of time in Flink,
> > > > mostly as it relates to windows. The problem is that mixing
> > > processing-time
> > > > and event-time windows in one topology is very hard (impossible)
> right
> > > now.
> > > > Let my explain it with this example:
> > > > >
> > > > > val env: StreamExecutionEnvironment = …
> > > > >
> > > > > env.setStreamTimeCharacteristic(EventTime)
> > > > >
> > > > > val input = <some stream>
> > > > >
> > > > > val quickResults = input
> > > > >   .keyBy(…)
> > > > >   .window(TumblingTimeWindows.of(Time.seconds(5))
> > > > >   .trigger(ProcessingTimeTrigger.create())
> > > > >   .sum(1)
> > > > >
> > > > > val slowResults = input
> > > > >   .keyBy(…)
> > > > >   .window(TumblingTimeWindows.of(Time.seconds(5))
> > > > >   // .trigger(EventTimeTrigger.create()) this is the default
> trigger,
> > > so
> > > > no need to set it, really
> > > > >   .sum(1)
> > > > >
> > > > > The idea is that you want to have fast, but possibly inaccurate,
> > > results
> > > > using processing time and correct, but maybe slower, results using
> > > > event-time windowing.
> > > > >
> > > > > The problem is that the current API tries to solve two problems:
> > > > >  1. We want to have a way to just say “time window” and then let
> the
> > > > system instantiate the correct window-operator based on the time
> > > > characteristic
> > > > >  2. We want to have flexibility to allow users to mix ’n match
> > > > processing-time and event-time windows
> > > > >
> > > > > The above example does not work because both operators would assign
> > > > elements to windows based on the event-time timestamp. The first
> window
> > > > therefore triggers event-time windows by processing time, which has
> > > > unexpected (wrong) results.
> > > > >
> > > > > I see three solutions to this:
> > > > >  1. Remove setStreamTimeCharacteristic(), let users always specify
> > > > exactly what kind of windowing they want
> > > > >  2. Keep setStreamTimeCharacteristic() but only employ the magic
> that
> > > > decides on the window operator for the special .timeWindow() call.
> Have
> > > two
> > > > different window assigners (two per window type, that is
> > TumblingWindows,
> > > > SlidingTimeWindows, SessionWindows, ...), one for processing-time and
> > one
> > > > for event-time that allow users to accurately specify what they want
> > > > >  3. Keep setStreamTimeCharacteristic() and have three window
> > assigners
> > > > per window type, one for processing-time, one for event-time and one
> > that
> > > > automatically decides based on the time characteristic
> > > > >
> > > > > What do you think?
> > > > >
> > > > > On a side note, I would also suggest to remove AbstractTime,
> > EventTime,
> > > > and ProcessingTime and just keep Time for specifying time.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > >
> > >
> >
>

Re: [DISCUSS] Time Behavior in Streaming Jobs (Event-time/processing-time)

Posted by Aljoscha Krettek <al...@apache.org>.
@Fabian So you would actually be for option 3) of my initial proposals?

@Stephan What do you mean by that? Would users set a time characteristic
per job or per window assigner in your suggestion?

On Fri, 18 Dec 2015 at 12:10 Fabian Hueske <fh...@gmail.com> wrote:

> +1 for the second approach.
>
> Regarding Stephan's comment, I think it would be better to have dedicated
> WindowAssigner classes. Otherwise, this becomes inconsistent with the
> dedicated Event/ProcessingTimeTriggers.
>
> 2015-12-18 12:03 GMT+01:00 Stephan Ewen <se...@apache.org>:
>
> > I am also in favor of option (2).
> >
> > We could also pass the TimeCharacteristic to for example the
> > "SlidingTimeWindows". Then there is one class, users can explicitly
> choose
> > the characteristic of choice, and when nothing is specified, the
> > default time characteristic is chosen.
> >
> > On Thu, Dec 17, 2015 at 11:41 AM, Maximilian Michels <mx...@apache.org>
> > wrote:
> >
> > > Hi Aljoscha,
> > >
> > > I'm in favor of option 2: Keep the setStreamTimeCharacteristic to set
> > > the default time behavior. Then add a method to the operators to set a
> > > custom time behavior.
> > >
> > > The problem explanatory in SlidingTimeWindows:
> > >
> > > @Override
> > > public Trigger<Object, TimeWindow>
> > > getDefaultTrigger(StreamExecutionEnvironment env) {
> > >    if (env.getStreamTimeCharacteristic() ==
> > > TimeCharacteristic.ProcessingTime) {
> > >       return ProcessingTimeTrigger.create();
> > >    } else {
> > >       return EventTimeTrigger.create();
> > >    }
> > > }
> > >
> > > That just needs to be fixed, e.g. by having a dedicated
> > > setTimeCharacteristic(..) on the operator.
> > >
> > > +1 for removing AbstractTime, EvenTime, and ProcessingTime.
> > >
> > > Cheers,
> > > Max
> > >
> > > On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > > > Hi,
> > > > I thought a bit about how to improve the handling of time in Flink,
> > > mostly as it relates to windows. The problem is that mixing
> > processing-time
> > > and event-time windows in one topology is very hard (impossible) right
> > now.
> > > Let my explain it with this example:
> > > >
> > > > val env: StreamExecutionEnvironment = …
> > > >
> > > > env.setStreamTimeCharacteristic(EventTime)
> > > >
> > > > val input = <some stream>
> > > >
> > > > val quickResults = input
> > > >   .keyBy(…)
> > > >   .window(TumblingTimeWindows.of(Time.seconds(5))
> > > >   .trigger(ProcessingTimeTrigger.create())
> > > >   .sum(1)
> > > >
> > > > val slowResults = input
> > > >   .keyBy(…)
> > > >   .window(TumblingTimeWindows.of(Time.seconds(5))
> > > >   // .trigger(EventTimeTrigger.create()) this is the default trigger,
> > so
> > > no need to set it, really
> > > >   .sum(1)
> > > >
> > > > The idea is that you want to have fast, but possibly inaccurate,
> > results
> > > using processing time and correct, but maybe slower, results using
> > > event-time windowing.
> > > >
> > > > The problem is that the current API tries to solve two problems:
> > > >  1. We want to have a way to just say “time window” and then let the
> > > system instantiate the correct window-operator based on the time
> > > characteristic
> > > >  2. We want to have flexibility to allow users to mix ’n match
> > > processing-time and event-time windows
> > > >
> > > > The above example does not work because both operators would assign
> > > elements to windows based on the event-time timestamp. The first window
> > > therefore triggers event-time windows by processing time, which has
> > > unexpected (wrong) results.
> > > >
> > > > I see three solutions to this:
> > > >  1. Remove setStreamTimeCharacteristic(), let users always specify
> > > exactly what kind of windowing they want
> > > >  2. Keep setStreamTimeCharacteristic() but only employ the magic that
> > > decides on the window operator for the special .timeWindow() call. Have
> > two
> > > different window assigners (two per window type, that is
> TumblingWindows,
> > > SlidingTimeWindows, SessionWindows, ...), one for processing-time and
> one
> > > for event-time that allow users to accurately specify what they want
> > > >  3. Keep setStreamTimeCharacteristic() and have three window
> assigners
> > > per window type, one for processing-time, one for event-time and one
> that
> > > automatically decides based on the time characteristic
> > > >
> > > > What do you think?
> > > >
> > > > On a side note, I would also suggest to remove AbstractTime,
> EventTime,
> > > and ProcessingTime and just keep Time for specifying time.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > >
> >
>

Re: [DISCUSS] Time Behavior in Streaming Jobs (Event-time/processing-time)

Posted by Fabian Hueske <fh...@gmail.com>.
+1 for the second approach.

Regarding Stephan's comment, I think it would be better to have dedicated
WindowAssigner classes. Otherwise, this becomes inconsistent with the
dedicated Event/ProcessingTimeTriggers.

2015-12-18 12:03 GMT+01:00 Stephan Ewen <se...@apache.org>:

> I am also in favor of option (2).
>
> We could also pass the TimeCharacteristic to for example the
> "SlidingTimeWindows". Then there is one class, users can explicitly choose
> the characteristic of choice, and when nothing is specified, the
> default time characteristic is chosen.
>
> On Thu, Dec 17, 2015 at 11:41 AM, Maximilian Michels <mx...@apache.org>
> wrote:
>
> > Hi Aljoscha,
> >
> > I'm in favor of option 2: Keep the setStreamTimeCharacteristic to set
> > the default time behavior. Then add a method to the operators to set a
> > custom time behavior.
> >
> > The problem explanatory in SlidingTimeWindows:
> >
> > @Override
> > public Trigger<Object, TimeWindow>
> > getDefaultTrigger(StreamExecutionEnvironment env) {
> >    if (env.getStreamTimeCharacteristic() ==
> > TimeCharacteristic.ProcessingTime) {
> >       return ProcessingTimeTrigger.create();
> >    } else {
> >       return EventTimeTrigger.create();
> >    }
> > }
> >
> > That just needs to be fixed, e.g. by having a dedicated
> > setTimeCharacteristic(..) on the operator.
> >
> > +1 for removing AbstractTime, EvenTime, and ProcessingTime.
> >
> > Cheers,
> > Max
> >
> > On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> > > Hi,
> > > I thought a bit about how to improve the handling of time in Flink,
> > mostly as it relates to windows. The problem is that mixing
> processing-time
> > and event-time windows in one topology is very hard (impossible) right
> now.
> > Let my explain it with this example:
> > >
> > > val env: StreamExecutionEnvironment = …
> > >
> > > env.setStreamTimeCharacteristic(EventTime)
> > >
> > > val input = <some stream>
> > >
> > > val quickResults = input
> > >   .keyBy(…)
> > >   .window(TumblingTimeWindows.of(Time.seconds(5))
> > >   .trigger(ProcessingTimeTrigger.create())
> > >   .sum(1)
> > >
> > > val slowResults = input
> > >   .keyBy(…)
> > >   .window(TumblingTimeWindows.of(Time.seconds(5))
> > >   // .trigger(EventTimeTrigger.create()) this is the default trigger,
> so
> > no need to set it, really
> > >   .sum(1)
> > >
> > > The idea is that you want to have fast, but possibly inaccurate,
> results
> > using processing time and correct, but maybe slower, results using
> > event-time windowing.
> > >
> > > The problem is that the current API tries to solve two problems:
> > >  1. We want to have a way to just say “time window” and then let the
> > system instantiate the correct window-operator based on the time
> > characteristic
> > >  2. We want to have flexibility to allow users to mix ’n match
> > processing-time and event-time windows
> > >
> > > The above example does not work because both operators would assign
> > elements to windows based on the event-time timestamp. The first window
> > therefore triggers event-time windows by processing time, which has
> > unexpected (wrong) results.
> > >
> > > I see three solutions to this:
> > >  1. Remove setStreamTimeCharacteristic(), let users always specify
> > exactly what kind of windowing they want
> > >  2. Keep setStreamTimeCharacteristic() but only employ the magic that
> > decides on the window operator for the special .timeWindow() call. Have
> two
> > different window assigners (two per window type, that is TumblingWindows,
> > SlidingTimeWindows, SessionWindows, ...), one for processing-time and one
> > for event-time that allow users to accurately specify what they want
> > >  3. Keep setStreamTimeCharacteristic() and have three window assigners
> > per window type, one for processing-time, one for event-time and one that
> > automatically decides based on the time characteristic
> > >
> > > What do you think?
> > >
> > > On a side note, I would also suggest to remove AbstractTime, EventTime,
> > and ProcessingTime and just keep Time for specifying time.
> > >
> > > Cheers,
> > > Aljoscha
> >
>

Re: [DISCUSS] Time Behavior in Streaming Jobs (Event-time/processing-time)

Posted by Stephan Ewen <se...@apache.org>.
I am also in favor of option (2).

We could also pass the TimeCharacteristic to for example the
"SlidingTimeWindows". Then there is one class, users can explicitly choose
the characteristic of choice, and when nothing is specified, the
default time characteristic is chosen.

On Thu, Dec 17, 2015 at 11:41 AM, Maximilian Michels <mx...@apache.org> wrote:

> Hi Aljoscha,
>
> I'm in favor of option 2: Keep the setStreamTimeCharacteristic to set
> the default time behavior. Then add a method to the operators to set a
> custom time behavior.
>
> The problem explanatory in SlidingTimeWindows:
>
> @Override
> public Trigger<Object, TimeWindow>
> getDefaultTrigger(StreamExecutionEnvironment env) {
>    if (env.getStreamTimeCharacteristic() ==
> TimeCharacteristic.ProcessingTime) {
>       return ProcessingTimeTrigger.create();
>    } else {
>       return EventTimeTrigger.create();
>    }
> }
>
> That just needs to be fixed, e.g. by having a dedicated
> setTimeCharacteristic(..) on the operator.
>
> +1 for removing AbstractTime, EvenTime, and ProcessingTime.
>
> Cheers,
> Max
>
> On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
> > Hi,
> > I thought a bit about how to improve the handling of time in Flink,
> mostly as it relates to windows. The problem is that mixing processing-time
> and event-time windows in one topology is very hard (impossible) right now.
> Let my explain it with this example:
> >
> > val env: StreamExecutionEnvironment = …
> >
> > env.setStreamTimeCharacteristic(EventTime)
> >
> > val input = <some stream>
> >
> > val quickResults = input
> >   .keyBy(…)
> >   .window(TumblingTimeWindows.of(Time.seconds(5))
> >   .trigger(ProcessingTimeTrigger.create())
> >   .sum(1)
> >
> > val slowResults = input
> >   .keyBy(…)
> >   .window(TumblingTimeWindows.of(Time.seconds(5))
> >   // .trigger(EventTimeTrigger.create()) this is the default trigger, so
> no need to set it, really
> >   .sum(1)
> >
> > The idea is that you want to have fast, but possibly inaccurate, results
> using processing time and correct, but maybe slower, results using
> event-time windowing.
> >
> > The problem is that the current API tries to solve two problems:
> >  1. We want to have a way to just say “time window” and then let the
> system instantiate the correct window-operator based on the time
> characteristic
> >  2. We want to have flexibility to allow users to mix ’n match
> processing-time and event-time windows
> >
> > The above example does not work because both operators would assign
> elements to windows based on the event-time timestamp. The first window
> therefore triggers event-time windows by processing time, which has
> unexpected (wrong) results.
> >
> > I see three solutions to this:
> >  1. Remove setStreamTimeCharacteristic(), let users always specify
> exactly what kind of windowing they want
> >  2. Keep setStreamTimeCharacteristic() but only employ the magic that
> decides on the window operator for the special .timeWindow() call. Have two
> different window assigners (two per window type, that is TumblingWindows,
> SlidingTimeWindows, SessionWindows, ...), one for processing-time and one
> for event-time that allow users to accurately specify what they want
> >  3. Keep setStreamTimeCharacteristic() and have three window assigners
> per window type, one for processing-time, one for event-time and one that
> automatically decides based on the time characteristic
> >
> > What do you think?
> >
> > On a side note, I would also suggest to remove AbstractTime, EventTime,
> and ProcessingTime and just keep Time for specifying time.
> >
> > Cheers,
> > Aljoscha
>

Re: [DISCUSS] Time Behavior in Streaming Jobs (Event-time/processing-time)

Posted by Maximilian Michels <mx...@apache.org>.
Hi Aljoscha,

I'm in favor of option 2: Keep the setStreamTimeCharacteristic to set
the default time behavior. Then add a method to the operators to set a
custom time behavior.

The problem explanatory in SlidingTimeWindows:

@Override
public Trigger<Object, TimeWindow>
getDefaultTrigger(StreamExecutionEnvironment env) {
   if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
      return ProcessingTimeTrigger.create();
   } else {
      return EventTimeTrigger.create();
   }
}

That just needs to be fixed, e.g. by having a dedicated
setTimeCharacteristic(..) on the operator.

+1 for removing AbstractTime, EvenTime, and ProcessingTime.

Cheers,
Max

On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek <al...@apache.org> wrote:
> Hi,
> I thought a bit about how to improve the handling of time in Flink, mostly as it relates to windows. The problem is that mixing processing-time and event-time windows in one topology is very hard (impossible) right now. Let my explain it with this example:
>
> val env: StreamExecutionEnvironment = …
>
> env.setStreamTimeCharacteristic(EventTime)
>
> val input = <some stream>
>
> val quickResults = input
>   .keyBy(…)
>   .window(TumblingTimeWindows.of(Time.seconds(5))
>   .trigger(ProcessingTimeTrigger.create())
>   .sum(1)
>
> val slowResults = input
>   .keyBy(…)
>   .window(TumblingTimeWindows.of(Time.seconds(5))
>   // .trigger(EventTimeTrigger.create()) this is the default trigger, so no need to set it, really
>   .sum(1)
>
> The idea is that you want to have fast, but possibly inaccurate, results using processing time and correct, but maybe slower, results using event-time windowing.
>
> The problem is that the current API tries to solve two problems:
>  1. We want to have a way to just say “time window” and then let the system instantiate the correct window-operator based on the time characteristic
>  2. We want to have flexibility to allow users to mix ’n match processing-time and event-time windows
>
> The above example does not work because both operators would assign elements to windows based on the event-time timestamp. The first window therefore triggers event-time windows by processing time, which has unexpected (wrong) results.
>
> I see three solutions to this:
>  1. Remove setStreamTimeCharacteristic(), let users always specify exactly what kind of windowing they want
>  2. Keep setStreamTimeCharacteristic() but only employ the magic that decides on the window operator for the special .timeWindow() call. Have two different window assigners (two per window type, that is TumblingWindows, SlidingTimeWindows, SessionWindows, ...), one for processing-time and one for event-time that allow users to accurately specify what they want
>  3. Keep setStreamTimeCharacteristic() and have three window assigners per window type, one for processing-time, one for event-time and one that automatically decides based on the time characteristic
>
> What do you think?
>
> On a side note, I would also suggest to remove AbstractTime, EventTime, and ProcessingTime and just keep Time for specifying time.
>
> Cheers,
> Aljoscha

Re: [DISCUSS] Time Behavior in Streaming Jobs (Event-time/processing-time)

Posted by Kostas Tzoumas <kt...@apache.org>.
Aljoscha, thanks for starting this discussion. I think this will be very
important to get right.

Can you explain a bit more why the results are "wrong"? I understand that
window panes are built on event timestamps (as intended), but fired at
regular intervals instead of watermarks. Why is this wrong and what would
be the correct (intended) result?

I think we should keep the "setTimeCharacteristic". It makes the simple
case simple.

On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> I thought a bit about how to improve the handling of time in Flink, mostly
> as it relates to windows. The problem is that mixing processing-time and
> event-time windows in one topology is very hard (impossible) right now. Let
> my explain it with this example:
>
> val env: StreamExecutionEnvironment = …
>
> env.setStreamTimeCharacteristic(EventTime)
>
> val input = <some stream>
>
> val quickResults = input
>   .keyBy(…)
>   .window(TumblingTimeWindows.of(Time.seconds(5))
>   .trigger(ProcessingTimeTrigger.create())
>   .sum(1)
>
> val slowResults = input
>   .keyBy(…)
>   .window(TumblingTimeWindows.of(Time.seconds(5))
>   // .trigger(EventTimeTrigger.create()) this is the default trigger, so
> no need to set it, really
>   .sum(1)
>
> The idea is that you want to have fast, but possibly inaccurate, results
> using processing time and correct, but maybe slower, results using
> event-time windowing.
>
> The problem is that the current API tries to solve two problems:
>  1. We want to have a way to just say “time window” and then let the
> system instantiate the correct window-operator based on the time
> characteristic
>  2. We want to have flexibility to allow users to mix ’n match
> processing-time and event-time windows
>
> The above example does not work because both operators would assign
> elements to windows based on the event-time timestamp. The first window
> therefore triggers event-time windows by processing time, which has
> unexpected (wrong) results.
>
> I see three solutions to this:
>  1. Remove setStreamTimeCharacteristic(), let users always specify exactly
> what kind of windowing they want
>  2. Keep setStreamTimeCharacteristic() but only employ the magic that
> decides on the window operator for the special .timeWindow() call. Have two
> different window assigners (two per window type, that is TumblingWindows,
> SlidingTimeWindows, SessionWindows, ...), one for processing-time and one
> for event-time that allow users to accurately specify what they want
>  3. Keep setStreamTimeCharacteristic() and have three window assigners per
> window type, one for processing-time, one for event-time and one that
> automatically decides based on the time characteristic
>
> What do you think?
>
> On a side note, I would also suggest to remove AbstractTime, EventTime,
> and ProcessingTime and just keep Time for specifying time.
>
> Cheers,
> Aljoscha