You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Maurizio Sambati <ma...@viralize.com> on 2018/02/09 15:46:31 UTC

Fwd: Stateful processing with session window

Hi everyone,

I'm trying to write a simple pipeline to experiment both stateful
processing and session window.

I have an event stream, each event has a timestamp and a session key, I
want to group by each session and enrich all events using a common state of
the session. In this case I'm just replacing the event with an incremental
counter.

So, let's say I have a source that outputs an event every second and my
stream is [a, a, b, a, a, c, a, a, b, c, c, c, a, a] (I'm just writing only
the session key as the value is useless for the purpose of the issue I'm
experiencing)

I want the following output: [<a, 0>, <a, 1>, <b, 0>, <a, 2>, <a, 3>, ...]
(actually the order is not important)

Unluckily my code seems not to work as I was expecting and I'm not able to
understand the reason. (to be honest I haven't found many resources on the
topic) What I actually get is something like:

a, 0
a, 1
b, 0
a, 0    <-- ???
a, 2,   <---???
c, 0,
...

that makes me wonder if I have actually understood how the state is related
to the key-window pair or maybe if I have just misunderstood how the
window/triggering works.

My pipeline looks something like:

p.apply(TextIO.read().from("input.json"))

 .apply(MapElements.via(new ParseTableRowJson()))

 .apply(new AugmentEvents())

 .apply(ParDo.of(new DoFn<KV<String, Long>, Void>() {

          @ProcessElement

  public void processElement(ProcessContext c)  {

    LOG.info(c.element().getKey() + ": " + c.element().getValue());

  }

}));

...

static class AugmentEvents extends PTransform<PCollection<TableRow>,
PCollection<KV<String, Long>>> {

  @Override

  public PCollection<KV<String, Long>> expand(PCollection<TableRow> input) {

    return input

      .apply(ParDo.of(new ExtractSessionIdAndTimestamp()))

      .apply(new ComputeSessions());

  }

}


static class ComputeSessions extends PTransform<PCollection<KV<String,
TableRow>>, PCollection<KV<String, Long>>> {

  @Override

  public PCollection<KV<String, Long>> expand(PCollection<KV<String,
TableRow>> events) {

    return events

      .apply(Window.<KV<String, TableRow>>into(Sessions.
withGapDuration(Duration.standardMinutes(10)))

      .triggering(AfterPane.elementCountAtLeast(1))

      .discardingFiredPanes()

      .withAllowedLateness(Duration.standardMinutes(10)))

      .apply(ParDo.of(new StatefulCount()));

  }

}

static class StatefulCount extends DoFn<KV<String, TableRow>, KV<String,
Long>> {

  @StateId("storage")

  private final StateSpec<ValueState<Integer>> storageSpec =
 StateSpecs.value(VarIntCoder.of());

  @ProcessElement

  public void processElement(ProcessContext context, BoundedWindow window,
@StateId("storage") ValueState<Integer> storage) {

    Integer val = storage.read();

    if (val == null) {

      val = new Integer(0);

    }

    int current = val.intValue();

    context.output(KV.of(context.element().getKey(), new Long(current)));

    storage.write(current+1);

  }

}

Maurizio

Re: Stateful processing with session window

Posted by Kenneth Knowles <kl...@google.com>.
On Mon, Feb 12, 2018 at 1:09 AM, Maurizio Sambati <ma...@viralize.com>
wrote:

> Hi Carlos,
>
> What I think is happening here is that the third 'a' you see is actually
>> on a different window of the other 3 a's. Stateful being per key and window
>> means that it keeps state for each key-window pairs, therefore, if your
>> 'a's counter is being restarted is probably because it is actually a
>> different one, and as the key is the same then the only possibility is that
>> the window is different.
>>
>
> Yeah, that was my initial guess too, that's why I have questioned if I
> have understood the semantic of the session window itself. Fortunately, as
> Kenneth pointed out, my understanding was correct but this window model is
> not compatible with stateful processing yet.
>

I want to mention something here - we do know that it is compatible with
merging windows. In fact, triggers use the state mechanism in merging
windows "under the hood". The issue is connecting it to user-defined state.
Each runner is slightly different, though it is not terribly difficult for
any of them. For BagState, it will automatically combine the bags. For
CombiningState it will automaticaly use mergeAccumulators. For ValueState
it will probably not be supported for a while, and perhaps eventually will
have a merge callback.

Kenn

Re: Stateful processing with session window

Posted by Maurizio Sambati <ma...@viralize.com>.
Hi Carlos,

What I think is happening here is that the third 'a' you see is actually on
> a different window of the other 3 a's. Stateful being per key and window
> means that it keeps state for each key-window pairs, therefore, if your
> 'a's counter is being restarted is probably because it is actually a
> different one, and as the key is the same then the only possibility is that
> the window is different.
>

Yeah, that was my initial guess too, that's why I have questioned if I have
understood the semantic of the session window itself. Fortunately, as
Kenneth pointed out, my understanding was correct but this window model is
not compatible with stateful processing yet.

Maurizio

Re: Stateful processing with session window

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Hi Maurizio, I'm not a very experienced user here, I'm actually getting
started into all this, but I'm going to give this one a try and see if I
can help.

What I think is happening here is that the third 'a' you see is actually on
a different window of the other 3 a's. Stateful being per key and window
means that it keeps state for each key-window pairs, therefore, if your
'a's counter is being restarted is probably because it is actually a
different one, and as the key is the same then the only possibility is that
the window is different. You can try to debug your pipeline and see if my
guess is actually right or not by printing also the window information of
your elements.

Hope it helps.

On Fri, Feb 9, 2018 at 4:46 PM Maurizio Sambati <ma...@viralize.com>
wrote:

> Hi everyone,
>
> I'm trying to write a simple pipeline to experiment both stateful
> processing and session window.
>
> I have an event stream, each event has a timestamp and a session key, I
> want to group by each session and enrich all events using a common state of
> the session. In this case I'm just replacing the event with an incremental
> counter.
>
> So, let's say I have a source that outputs an event every second and my
> stream is [a, a, b, a, a, c, a, a, b, c, c, c, a, a] (I'm just writing only
> the session key as the value is useless for the purpose of the issue I'm
> experiencing)
>
> I want the following output: [<a, 0>, <a, 1>, <b, 0>, <a, 2>, <a, 3>, ...]
> (actually the order is not important)
>
> Unluckily my code seems not to work as I was expecting and I'm not able to
> understand the reason. (to be honest I haven't found many resources on the
> topic) What I actually get is something like:
>
> a, 0
> a, 1
> b, 0
> a, 0    <-- ???
> a, 2,   <---???
> c, 0,
> ...
>
> that makes me wonder if I have actually understood how the state is
> related to the key-window pair or maybe if I have just misunderstood how
> the window/triggering works.
>
> My pipeline looks something like:
>
> p.apply(TextIO.read().from("input.json"))
>
>  .apply(MapElements.via(new ParseTableRowJson()))
>
>  .apply(new AugmentEvents())
>
>  .apply(ParDo.of(new DoFn<KV<String, Long>, Void>() {
>
>           @ProcessElement
>
>   public void processElement(ProcessContext c)  {
>
>     LOG.info(c.element().getKey() + ": " + c.element().getValue());
>
>   }
>
> }));
>
> ...
>
> static class AugmentEvents extends PTransform<PCollection<TableRow>,
> PCollection<KV<String, Long>>> {
>
>   @Override
>
>   public PCollection<KV<String, Long>> expand(PCollection<TableRow> input)
> {
>
>     return input
>
>       .apply(ParDo.of(new ExtractSessionIdAndTimestamp()))
>
>       .apply(new ComputeSessions());
>
>   }
>
> }
>
>
> static class ComputeSessions extends PTransform<PCollection<KV<String,
> TableRow>>, PCollection<KV<String, Long>>> {
>
>   @Override
>
>   public PCollection<KV<String, Long>> expand(PCollection<KV<String,
> TableRow>> events) {
>
>     return events
>
>       .apply(Window.<KV<String,
> TableRow>>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
>
>       .triggering(AfterPane.elementCountAtLeast(1))
>
>       .discardingFiredPanes()
>
>       .withAllowedLateness(Duration.standardMinutes(10)))
>
>       .apply(ParDo.of(new StatefulCount()));
>
>   }
>
> }
>
> static class StatefulCount extends DoFn<KV<String, TableRow>, KV<String,
> Long>> {
>
>   @StateId("storage")
>
>   private final StateSpec<ValueState<Integer>> storageSpec =
>  StateSpecs.value(VarIntCoder.of());
>
>   @ProcessElement
>
>   public void processElement(ProcessContext context, BoundedWindow window
> , @StateId("storage") ValueState<Integer> storage) {
>
>     Integer val = storage.read();
>
>     if (val == null) {
>
>       val = new Integer(0);
>
>     }
>
>     int current = val.intValue();
>
>     context.output(KV.of(context.element().getKey(), new Long(current)));
>
>     storage.write(current+1);
>
>   }
>
> }
>
> Maurizio
>
>

Re: Stateful processing with session window

Posted by Maurizio Sambati <ma...@viralize.com>.
Hi Kenneth,

What runner are you using? Are you trying this out in the DirectRunner? As
> far as I know, no runner supports stateful processing in session windows
> yet. It is probably a bug that your pipeline was accepted by the runner
> when it includes features that the runner cannot execute. It would be a
> real mistake to have missed this validation for the DirectRunner.
>

Oh, ok, got it. :-(
I was actually trying this on the DirectRunner and it didn't fire errors,
so I guess no validation is performed in that sense.

Support for stateful processing in merging windows is definitely important.
> There's only a Jira filed for Dataflow [1] as far as I can tell. I just
> cloned it for the DirectRunner [2] since that is how you would test your
> pipeline. If you want to follow the same feature for a different runner, I
> can route a new ticket to the right person.
>
> Kenn
>
> [1] https://issues.apache.org/jira/browse/BEAM-2507
> [2] https://issues.apache.org/jira/browse/BEAM-3686
>

Awesome, I'm glad there is interest in adding this feature, it really fits
our use case. (and for the moment we have only interests for the two target
runners mentioned by you)

Thanks,
Maurizio

Re: Stateful processing with session window

Posted by Kenneth Knowles <kl...@google.com>.
Hi Maurizio,

What runner are you using? Are you trying this out in the DirectRunner? As
far as I know, no runner supports stateful processing in session windows
yet. It is probably a bug that your pipeline was accepted by the runner
when it includes features that the runner cannot execute. It would be a
real mistake to have missed this validation for the DirectRunner.

Support for stateful processing in merging windows is definitely important.
There's only a Jira filed for Dataflow [1] as far as I can tell. I just
cloned it for the DirectRunner [2] since that is how you would test your
pipeline. If you want to follow the same feature for a different runner, I
can route a new ticket to the right person.

Kenn

[1] https://issues.apache.org/jira/browse/BEAM-2507
[2] https://issues.apache.org/jira/browse/BEAM-3686


On Fri, Feb 9, 2018 at 7:46 AM, Maurizio Sambati <ma...@viralize.com>
wrote:

> Hi everyone,
>
> I'm trying to write a simple pipeline to experiment both stateful
> processing and session window.
>
> I have an event stream, each event has a timestamp and a session key, I
> want to group by each session and enrich all events using a common state of
> the session. In this case I'm just replacing the event with an incremental
> counter.
>
> So, let's say I have a source that outputs an event every second and my
> stream is [a, a, b, a, a, c, a, a, b, c, c, c, a, a] (I'm just writing only
> the session key as the value is useless for the purpose of the issue I'm
> experiencing)
>
> I want the following output: [<a, 0>, <a, 1>, <b, 0>, <a, 2>, <a, 3>, ...]
> (actually the order is not important)
>
> Unluckily my code seems not to work as I was expecting and I'm not able to
> understand the reason. (to be honest I haven't found many resources on the
> topic) What I actually get is something like:
>
> a, 0
> a, 1
> b, 0
> a, 0    <-- ???
> a, 2,   <---???
> c, 0,
> ...
>
> that makes me wonder if I have actually understood how the state is
> related to the key-window pair or maybe if I have just misunderstood how
> the window/triggering works.
>
> My pipeline looks something like:
>
> p.apply(TextIO.read().from("input.json"))
>
>  .apply(MapElements.via(new ParseTableRowJson()))
>
>  .apply(new AugmentEvents())
>
>  .apply(ParDo.of(new DoFn<KV<String, Long>, Void>() {
>
>           @ProcessElement
>
>   public void processElement(ProcessContext c)  {
>
>     LOG.info(c.element().getKey() + ": " + c.element().getValue());
>
>   }
>
> }));
>
> ...
>
> static class AugmentEvents extends PTransform<PCollection<TableRow>,
> PCollection<KV<String, Long>>> {
>
>   @Override
>
>   public PCollection<KV<String, Long>> expand(PCollection<TableRow> input)
> {
>
>     return input
>
>       .apply(ParDo.of(new ExtractSessionIdAndTimestamp()))
>
>       .apply(new ComputeSessions());
>
>   }
>
> }
>
>
> static class ComputeSessions extends PTransform<PCollection<KV<String,
> TableRow>>, PCollection<KV<String, Long>>> {
>
>   @Override
>
>   public PCollection<KV<String, Long>> expand(PCollection<KV<String,
> TableRow>> events) {
>
>     return events
>
>       .apply(Window.<KV<String, TableRow>>into(Sessions.withGa
> pDuration(Duration.standardMinutes(10)))
>
>       .triggering(AfterPane.elementCountAtLeast(1))
>
>       .discardingFiredPanes()
>
>       .withAllowedLateness(Duration.standardMinutes(10)))
>
>       .apply(ParDo.of(new StatefulCount()));
>
>   }
>
> }
>
> static class StatefulCount extends DoFn<KV<String, TableRow>, KV<String,
> Long>> {
>
>   @StateId("storage")
>
>   private final StateSpec<ValueState<Integer>> storageSpec =
>  StateSpecs.value(VarIntCoder.of());
>
>   @ProcessElement
>
>   public void processElement(ProcessContext context, BoundedWindow window
> , @StateId("storage") ValueState<Integer> storage) {
>
>     Integer val = storage.read();
>
>     if (val == null) {
>
>       val = new Integer(0);
>
>     }
>
>     int current = val.intValue();
>
>     context.output(KV.of(context.element().getKey(), new Long(current)));
>
>     storage.write(current+1);
>
>   }
>
> }
>
> Maurizio
>
>
>