You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jacob Marble <jm...@kochava.com> on 2017/11/09 01:33:43 UTC

Windowing in a batch pipeline

Good evening. I'm trying to nail down windowing. The concept is clear, just
struggling with writing a working pipeline. Tonight the goal is group
events by key and window, in a batch pipeline. All data is "late" because
it's a batch pipeline, and I expect nothing to be dropped or processed in a
"late" context.

Read section 7 and 8 of the Beam Programming Guide roughly twice.
Sifted through the examples, WindowedWordCount is close, but it doesn't use
triggering, which is where (2b) is probably off track.

1)
PCollection is created through a series of transforms, including a
Join.leftOuterJoin(). Apply a timestamp with something simple:

collection.apply("add window timestamp",
 ParDo.of(new DoFn<Foo, Foo>() {
  @ProcessElement
  public void map(ProcessContext context) {
   Foo element = context.element();
   Instant timestamp = new Instant(element.getActivityUnixSeconds() * 1000);
   context.outputWithTimestamp(element, timestamp);
  }
 }));

This fails with "java.lang.IllegalArgumentException: Cannot output with
timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
than the timestamp of the current input (294247-01-09T04:00:54.775Z) minus
the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
Javadoc for details on changing the allowed skew."

*Is this expected? I don't care about skew, just want to set the timestamp
per element.*

I worked around this by applying the timestamp earlier in the pipeline,
right after a TextIO.read(). Why does that fix the problem?

2a)
After applying the timestamp, let's window!

collection.apply("window into sessions",
 Window.<Foo>into(Sessions.withGapDuration(Duration.standardMinutes(10))))
 .apply("key by something, reduce")
 .apply(TextIO.write()...)

Now I see an output file, what joy! *But the output file is empty.* Confirmed
that the PCollection feeding TextIO.write() is seeing data. Maybe this is
because the default trigger is incorrect for my use case? I expected not to
need triggering in batch context, but the DefaultTrigger Javadoc makes me
believe otherwise.

2b)
How about the Never.ever() trigger? Javadoc: "Using this trigger will only
produce output when the watermark passes the end of the {@link
BoundedWindow window}". I don't know, but let's try. There's some error
about allowed lateness and firing panes, so I'll try values that look
standard:

collection.apply("window into sessions",
 Window.<Foo>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
  .triggering(Never.ever()).withAllowedLateness(Duration.stand
ardDays(1)).discardingFiredPanes())
 .apply("key by something, reduce")
 .apply(TextIO.write()...)

This yields a new error:
"java.lang.IllegalStateException: TimestampCombiner moved element from
294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z (end
of global window) for window org.apache.beam.sdk.transforms
.windowing.GlobalWindow"

So I'm probably looking in the wrong place.

Thanks!

Jacob

Re: Windowing in a batch pipeline

Posted by Jacob Marble <jm...@kochava.com>.
Thanks Robert, here's what I did with your advice.

After the early Join/GBK transformations:

collection
 .apply(WithTimestamps.of(...).withAllowedTimestampSkew(new
Duration(Long.MAX_VALUE/10)))
 .apply(Window.<GeoPoiDaily>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
   .withAllowedLateness(new
Duration(Long.MAX_VALUE/10)).discardingFiredPanes())
 .apply(WithKeys.of(...))
 .apply(Combine.perKey(...))
 .apply(TextIO.write()...);

Long.MAX_VALUE/10 to prevent an overflow error; Long.MAX_VALUE/2 also works.

Jacob

On Wed, Nov 8, 2017 at 5:54 PM, Robert Bradshaw <ro...@google.com> wrote:

> On Wed, Nov 8, 2017 at 5:33 PM, Jacob Marble <jm...@kochava.com> wrote:
> > Good evening. I'm trying to nail down windowing. The concept is clear,
> just
> > struggling with writing a working pipeline. Tonight the goal is group
> events
> > by key and window, in a batch pipeline. All data is "late" because it's a
> > batch pipeline, and I expect nothing to be dropped or processed in a
> "late"
> > context.
>
> Traditionally, in a batch pipeline we consider no data to be late, as
> we have perfect knowledge of the watermark.
>
> > Read section 7 and 8 of the Beam Programming Guide roughly twice.
> > Sifted through the examples, WindowedWordCount is close, but it doesn't
> use
> > triggering, which is where (2b) is probably off track.
> >
> > 1)
> > PCollection is created through a series of transforms, including a
> > Join.leftOuterJoin(). Apply a timestamp with something simple:
> >
> > collection.apply("add window timestamp",
> >  ParDo.of(new DoFn<Foo, Foo>() {
> >   @ProcessElement
> >   public void map(ProcessContext context) {
> >    Foo element = context.element();
> >    Instant timestamp = new Instant(element.getActivityUnixSeconds() *
> 1000);
> >    context.outputWithTimestamp(element, timestamp);
> >   }
> >  }));
> >
> > This fails with "java.lang.IllegalArgumentException: Cannot output with
> > timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
> > than the timestamp of the current input (294247-01-09T04:00:54.775Z)
> minus
> > the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
> > Javadoc for details on changing the allowed skew."
> >
> > Is this expected? I don't care about skew, just want to set the timestamp
> > per element.
> >
> > I worked around this by applying the timestamp earlier in the pipeline,
> > right after a TextIO.read(). Why does that fix the problem?
>
> I would suspect that very-far-in-the-future timestamp is the end of
> the global window, set as the timestamp as the result of a
> group-by-key.
>
> You can set your timestamps earlier, as you have done, but in this
> case they will get reset after passing through any GBK. It's possible
> you could get what you want by setting TimestampCombiner to EARLIEST
> (see https://github.com/apache/beam/blob/v2.1.1/sdks/java/
> core/src/main/java/org/apache/beam/sdk/transforms/windowing/
> TimestampCombiner.java#L47)
> but probably the right solution is to set the allowed timestamp skew
> to infinity (or Long.MAX_VALUE or similar).
>
> Generally this skew is needed in streaming to hold the watermark back
> the right amount... Definitely not intuitive in your case; we should
> think if there's something better we could do here.
>
> > 2a)
> > After applying the timestamp, let's window!
> >
> > collection.apply("window into sessions",
> >  Window.<Foo>into(Sessions.withGapDuration(Duration.
> standardMinutes(10))))
> >  .apply("key by something, reduce")
> >  .apply(TextIO.write()...)
> >
> > Now I see an output file, what joy! But the output file is empty.
> Confirmed
> > that the PCollection feeding TextIO.write() is seeing data. Maybe this is
> > because the default trigger is incorrect for my use case? I expected not
> to
> > need triggering in batch context, but the DefaultTrigger Javadoc makes me
> > believe otherwise.
> >
> > 2b)
> > How about the Never.ever() trigger? Javadoc: "Using this trigger will
> only
> > produce output when the watermark passes the end of the {@link
> BoundedWindow
> > window}". I don't know, but let's try. There's some error about allowed
> > lateness and firing panes, so I'll try values that look standard:
> >
> > collection.apply("window into sessions",
> >  Window.<Foo>into(Sessions.withGapDuration(Duration.
> standardMinutes(10)))
> >
> > .triggering(Never.ever()).withAllowedLateness(Duration.standardDays(1)).
> discardingFiredPanes())
> >  .apply("key by something, reduce")
> >  .apply(TextIO.write()...)
> >
> > This yields a new error:
> > "java.lang.IllegalStateException: TimestampCombiner moved element from
> > 294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z
> (end
> > of global window) for window
> > org.apache.beam.sdk.transforms.windowing.GlobalWindow"
> >
> > So I'm probably looking in the wrong place.
>
> I think if you resolve the issues above than this will take care of itself.
>
> - Robert
>

Re: Windowing in a batch pipeline

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Nov 8, 2017 at 5:33 PM, Jacob Marble <jm...@kochava.com> wrote:
> Good evening. I'm trying to nail down windowing. The concept is clear, just
> struggling with writing a working pipeline. Tonight the goal is group events
> by key and window, in a batch pipeline. All data is "late" because it's a
> batch pipeline, and I expect nothing to be dropped or processed in a "late"
> context.

Traditionally, in a batch pipeline we consider no data to be late, as
we have perfect knowledge of the watermark.

> Read section 7 and 8 of the Beam Programming Guide roughly twice.
> Sifted through the examples, WindowedWordCount is close, but it doesn't use
> triggering, which is where (2b) is probably off track.
>
> 1)
> PCollection is created through a series of transforms, including a
> Join.leftOuterJoin(). Apply a timestamp with something simple:
>
> collection.apply("add window timestamp",
>  ParDo.of(new DoFn<Foo, Foo>() {
>   @ProcessElement
>   public void map(ProcessContext context) {
>    Foo element = context.element();
>    Instant timestamp = new Instant(element.getActivityUnixSeconds() * 1000);
>    context.outputWithTimestamp(element, timestamp);
>   }
>  }));
>
> This fails with "java.lang.IllegalArgumentException: Cannot output with
> timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
> than the timestamp of the current input (294247-01-09T04:00:54.775Z) minus
> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
> Javadoc for details on changing the allowed skew."
>
> Is this expected? I don't care about skew, just want to set the timestamp
> per element.
>
> I worked around this by applying the timestamp earlier in the pipeline,
> right after a TextIO.read(). Why does that fix the problem?

I would suspect that very-far-in-the-future timestamp is the end of
the global window, set as the timestamp as the result of a
group-by-key.

You can set your timestamps earlier, as you have done, but in this
case they will get reset after passing through any GBK. It's possible
you could get what you want by setting TimestampCombiner to EARLIEST
(see https://github.com/apache/beam/blob/v2.1.1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L47)
but probably the right solution is to set the allowed timestamp skew
to infinity (or Long.MAX_VALUE or similar).

Generally this skew is needed in streaming to hold the watermark back
the right amount... Definitely not intuitive in your case; we should
think if there's something better we could do here.

> 2a)
> After applying the timestamp, let's window!
>
> collection.apply("window into sessions",
>  Window.<Foo>into(Sessions.withGapDuration(Duration.standardMinutes(10))))
>  .apply("key by something, reduce")
>  .apply(TextIO.write()...)
>
> Now I see an output file, what joy! But the output file is empty. Confirmed
> that the PCollection feeding TextIO.write() is seeing data. Maybe this is
> because the default trigger is incorrect for my use case? I expected not to
> need triggering in batch context, but the DefaultTrigger Javadoc makes me
> believe otherwise.
>
> 2b)
> How about the Never.ever() trigger? Javadoc: "Using this trigger will only
> produce output when the watermark passes the end of the {@link BoundedWindow
> window}". I don't know, but let's try. There's some error about allowed
> lateness and firing panes, so I'll try values that look standard:
>
> collection.apply("window into sessions",
>  Window.<Foo>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
>
> .triggering(Never.ever()).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())
>  .apply("key by something, reduce")
>  .apply(TextIO.write()...)
>
> This yields a new error:
> "java.lang.IllegalStateException: TimestampCombiner moved element from
> 294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z (end
> of global window) for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow"
>
> So I'm probably looking in the wrong place.

I think if you resolve the issues above than this will take care of itself.

- Robert