You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Juan Carlos Garcia <jc...@gmail.com> on 2018/07/19 13:11:58 UTC

Generating a window identifier while using a Trigger AfterProcessingTime.pastFirstElementInPane()

Hi Folks,

I would like to ask if its possible to be notified when a Windows is
created or closed while processing a batch of data. (Sorry for the long
post)

My scenario:
I am using a Session window with a GapDuration of 2 minutes (for testing),
during this processing we are assigning a Session identifier to the
incoming messages so we can identify them later in ElasticSearch / Other
tools, the process works as expected as long as we don't introduce any
trigger (during the @ProcessElement we have the the Iterables elements from
this windows and from there we can just generate our session identifier
like) i.e:

<code>
PCollection<KV<Long, Iterable<String>>> windowedResult = input
.apply("Session",
Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(2))))
.apply("Create KV of Users", ParDo.of(new CreateMyKV()))
.apply(GroupByKey.create())
.apply(ParDo.of(new DoFn<KV<Long, Iterable<String>>, KV<Long,
Iterable<String>>>() {
    @ProcessElement
    public void processElement(ProcessContext c, BoundedWindow _window) {
        System.out.println("-- window:" + _window);
        System.out.println("session:" + UUID.randomUUID().toString());
        System.out.println(c.element().getValue());
        System.out.println("--");
        c.output(c.element());
    }
}));
</code>

After i added the trigger "AfterProcessingTime.pastFirstElementInPane()",
each fired pane doesn't contain any indications of the windows they belong
to, and there is no way (at least i couldn't find) to actually hook into it
and generate a Session identifier for the elements that belongs to the same
windows.

The behavior for @StartBundle is that it fires for each pane and the
behavior for @Setup is not consistent as it fires more times than windows
we have or sometime it fires less time.

Any advised on this matter is welcome and by the way, in production we are
using the SparkRunner (which only support ProcessingTime triggers based on
the capability-matrix), please find below a JUnit class i am using to
validate this behavior.

<code>
public class SessionWindowTest {
    private long TIME = System.currentTimeMillis();

    @Test
    public void testSessionWindowWorkAsExpected() {
        final List<String> testMesages = new LinkedList<>();
        TIME = System.currentTimeMillis();

        //
        // 3 Windows of data
        //

        IntStream.range(0, 10).forEach(i -> {
            testMesages.add("{\"user_id\":123456789,
\"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
            TIME += TimeUnit.SECONDS.toMillis(1);
        });
        TIME += TimeUnit.MINUTES.toMillis( 5);
        IntStream.range(0, 10).forEach(i -> {
            testMesages.add("{\"user_id\":123456789,
\"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
            TIME += TimeUnit.SECONDS.toMillis(2);
        });
        TIME += TimeUnit.MINUTES.toMillis(6);
        IntStream.range(0, 10).forEach(i -> {
            testMesages.add("{\"user_id\":123456789,
\"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
            TIME += TimeUnit.SECONDS.toMillis(4);
        });

        Pipeline pipe = Pipeline.create();
        PCollection<String> input = pipe.apply("Create",
Create.of(testMesages));

        PCollection<KV<Long, Iterable<String>>> windowedResult =
input.apply("setting the time",
                ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        Any deserialize =
JsonIterator.deserialize(c.element());
                        c.outputWithTimestamp(c.element(), new
Instant(deserialize.get("time").toLong()));
                    }
                }))
                .apply("Session",

Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(2)))
//
.withAllowedLateness(Duration.standardSeconds(1))
//                                .discardingFiredPanes()
//
.triggering(AfterProcessingTime.pastFirstElementInPane())
                )
                .apply("Create KV of Users", ParDo.of(new CreateUserKV()))
                .apply(GroupByKey.create())
                .apply(ParDo.of(new DoFn<KV<Long, Iterable<String>>,
KV<Long, Iterable<String>>>() {
                    private int counter = 0;

                    @StartBundle
                    public void startBundle() {
                        System.out.println("--BUNDLE--");
                    }

                    @Setup
                    public void setupFn() {
                        System.out.println("--SETUP--");
                    }
                    @ProcessElement
                    public void processElement(ProcessContext c,
BoundedWindow _window) {
                        System.out.println("-- window:" + _window);
                        System.out.println("session:" +
UUID.randomUUID().toString());
                        //System.out.println(c.element().getValue());
                        System.out.println("--");
                        c.output(c.element());
                    }
                }));

        PipelineResult run = pipe.run();
        Assert.assertTrue("Pipeline is done", run.getState() ==
PipelineResult.State.DONE);
    }

    static class CreateMyKV extends DoFn<String, KV<Long, String>> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            Any deserialize = JsonIterator.deserialize(c.element());
            Map<String, Any> stringAnyMap = deserialize.asMap();
            c.output(KV.of(stringAnyMap.get("user_id").toLong(),
c.element()));
        }
    }
}
</code>

-- 

JC

Re: Generating a window identifier while using a Trigger AfterProcessingTime.pastFirstElementInPane()

Posted by Juan Carlos Garcia <jc...@gmail.com>.
I looked into the PaneInfo but unfortunately it doesn't contain information
regarding the session window that it is useful for me (at least to tags all
the events belonging to same session with a  sessionID) . :(

I have no idea of BEAM internal or how complex would be to implement it,
but it would be cool if such hook or information (windowIdentifier) exist.

Thanks!

On Thu, Jul 19, 2018 at 10:18 PM, Lukasz Cwik <lc...@google.com> wrote:

> Note that @StartBundle is not co-related with a new pane, but an arbitrary
> runner chosen amount of elements containing any number of windows and
> elements restricted by the triggering semantics.
>
> You can introspect the PaneInfo to see the firing index, index 0
> represents the first firing. I don't believe there is a way to know what is
> the last firing without using a trigger that will produce a known number of
> firings (e.g. watermark trigger with no speculative or late firings).
>
> On Thu, Jul 19, 2018 at 6:12 AM Juan Carlos Garcia <jc...@gmail.com>
> wrote:
>
>> Hi Folks,
>>
>> I would like to ask if its possible to be notified when a Windows is
>> created or closed while processing a batch of data. (Sorry for the long
>> post)
>>
>> My scenario:
>> I am using a Session window with a GapDuration of 2 minutes (for
>> testing), during this processing we are assigning a Session identifier to
>> the incoming messages so we can identify them later in ElasticSearch /
>> Other tools, the process works as expected as long as we don't introduce
>> any trigger (during the @ProcessElement we have the the Iterables elements
>> from this windows and from there we can just generate our session
>> identifier like) i.e:
>>
>> <code>
>> PCollection<KV<Long, Iterable<String>>> windowedResult = input
>> .apply("Session", Window.<String>into(Sessions.withGapDuration(Duration.
>> standardMinutes(2))))
>> .apply("Create KV of Users", ParDo.of(new CreateMyKV()))
>> .apply(GroupByKey.create())
>> .apply(ParDo.of(new DoFn<KV<Long, Iterable<String>>, KV<Long,
>> Iterable<String>>>() {
>>     @ProcessElement
>>     public void processElement(ProcessContext c, BoundedWindow _window) {
>>         System.out.println("-- window:" + _window);
>>         System.out.println("session:" + UUID.randomUUID().toString());
>>         System.out.println(c.element().getValue());
>>         System.out.println("--");
>>         c.output(c.element());
>>     }
>> }));
>> </code>
>>
>> After i added the trigger "AfterProcessingTime.pastFirstElementInPane()",
>> each fired pane doesn't contain any indications of the windows they belong
>> to, and there is no way (at least i couldn't find) to actually hook into it
>> and generate a Session identifier for the elements that belongs to the same
>> windows.
>>
>> The behavior for @StartBundle is that it fires for each pane and the
>> behavior for @Setup is not consistent as it fires more times than windows
>> we have or sometime it fires less time.
>>
>> Any advised on this matter is welcome and by the way, in production we
>> are using the SparkRunner (which only support ProcessingTime triggers based
>> on the capability-matrix), please find below a JUnit class i am using to
>> validate this behavior.
>>
>> <code>
>> public class SessionWindowTest {
>>     private long TIME = System.currentTimeMillis();
>>
>>     @Test
>>     public void testSessionWindowWorkAsExpected() {
>>         final List<String> testMesages = new LinkedList<>();
>>         TIME = System.currentTimeMillis();
>>
>>         //
>>         // 3 Windows of data
>>         //
>>
>>         IntStream.range(0, 10).forEach(i -> {
>>             testMesages.add("{\"user_id\":123456789,
>> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
>>             TIME += TimeUnit.SECONDS.toMillis(1);
>>         });
>>         TIME += TimeUnit.MINUTES.toMillis( 5);
>>         IntStream.range(0, 10).forEach(i -> {
>>             testMesages.add("{\"user_id\":123456789,
>> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
>>             TIME += TimeUnit.SECONDS.toMillis(2);
>>         });
>>         TIME += TimeUnit.MINUTES.toMillis(6);
>>         IntStream.range(0, 10).forEach(i -> {
>>             testMesages.add("{\"user_id\":123456789,
>> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
>>             TIME += TimeUnit.SECONDS.toMillis(4);
>>         });
>>
>>         Pipeline pipe = Pipeline.create();
>>         PCollection<String> input = pipe.apply("Create",
>> Create.of(testMesages));
>>
>>         PCollection<KV<Long, Iterable<String>>> windowedResult =
>> input.apply("setting the time",
>>                 ParDo.of(new DoFn<String, String>() {
>>                     @ProcessElement
>>                     public void processElement(ProcessContext c) {
>>                         Any deserialize = JsonIterator.deserialize(c.
>> element());
>>                         c.outputWithTimestamp(c.element(), new
>> Instant(deserialize.get("time").toLong()));
>>                     }
>>                 }))
>>                 .apply("Session",
>>                         Window.<String>into(Sessions.
>> withGapDuration(Duration.standardMinutes(2)))
>> //                                .withAllowedLateness(Duration.
>> standardSeconds(1))
>> //                                .discardingFiredPanes()
>> //                                .triggering(AfterProcessingTime.
>> pastFirstElementInPane())
>>                 )
>>                 .apply("Create KV of Users", ParDo.of(new CreateUserKV()))
>>                 .apply(GroupByKey.create())
>>                 .apply(ParDo.of(new DoFn<KV<Long, Iterable<String>>,
>> KV<Long, Iterable<String>>>() {
>>                     private int counter = 0;
>>
>>                     @StartBundle
>>                     public void startBundle() {
>>                         System.out.println("--BUNDLE--");
>>                     }
>>
>>                     @Setup
>>                     public void setupFn() {
>>                         System.out.println("--SETUP--");
>>                     }
>>                     @ProcessElement
>>                     public void processElement(ProcessContext c,
>> BoundedWindow _window) {
>>                         System.out.println("-- window:" + _window);
>>                         System.out.println("session:" +
>> UUID.randomUUID().toString());
>>                         //System.out.println(c.element().getValue());
>>                         System.out.println("--");
>>                         c.output(c.element());
>>                     }
>>                 }));
>>
>>         PipelineResult run = pipe.run();
>>         Assert.assertTrue("Pipeline is done", run.getState() ==
>> PipelineResult.State.DONE);
>>     }
>>
>>     static class CreateMyKV extends DoFn<String, KV<Long, String>> {
>>         @ProcessElement
>>         public void processElement(ProcessContext c) {
>>             Any deserialize = JsonIterator.deserialize(c.element());
>>             Map<String, Any> stringAnyMap = deserialize.asMap();
>>             c.output(KV.of(stringAnyMap.get("user_id").toLong(),
>> c.element()));
>>         }
>>     }
>> }
>> </code>
>>
>> --
>>
>> JC
>>
>>


-- 

JC

Re: Generating a window identifier while using a Trigger AfterProcessingTime.pastFirstElementInPane()

Posted by Lukasz Cwik <lc...@google.com>.
Note that @StartBundle is not co-related with a new pane, but an arbitrary
runner chosen amount of elements containing any number of windows and
elements restricted by the triggering semantics.

You can introspect the PaneInfo to see the firing index, index 0 represents
the first firing. I don't believe there is a way to know what is the last
firing without using a trigger that will produce a known number of firings
(e.g. watermark trigger with no speculative or late firings).

On Thu, Jul 19, 2018 at 6:12 AM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> Hi Folks,
>
> I would like to ask if its possible to be notified when a Windows is
> created or closed while processing a batch of data. (Sorry for the long
> post)
>
> My scenario:
> I am using a Session window with a GapDuration of 2 minutes (for testing),
> during this processing we are assigning a Session identifier to the
> incoming messages so we can identify them later in ElasticSearch / Other
> tools, the process works as expected as long as we don't introduce any
> trigger (during the @ProcessElement we have the the Iterables elements from
> this windows and from there we can just generate our session identifier
> like) i.e:
>
> <code>
> PCollection<KV<Long, Iterable<String>>> windowedResult = input
> .apply("Session",
> Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(2))))
> .apply("Create KV of Users", ParDo.of(new CreateMyKV()))
> .apply(GroupByKey.create())
> .apply(ParDo.of(new DoFn<KV<Long, Iterable<String>>, KV<Long,
> Iterable<String>>>() {
>     @ProcessElement
>     public void processElement(ProcessContext c, BoundedWindow _window) {
>         System.out.println("-- window:" + _window);
>         System.out.println("session:" + UUID.randomUUID().toString());
>         System.out.println(c.element().getValue());
>         System.out.println("--");
>         c.output(c.element());
>     }
> }));
> </code>
>
> After i added the trigger "AfterProcessingTime.pastFirstElementInPane()",
> each fired pane doesn't contain any indications of the windows they belong
> to, and there is no way (at least i couldn't find) to actually hook into it
> and generate a Session identifier for the elements that belongs to the same
> windows.
>
> The behavior for @StartBundle is that it fires for each pane and the
> behavior for @Setup is not consistent as it fires more times than windows
> we have or sometime it fires less time.
>
> Any advised on this matter is welcome and by the way, in production we are
> using the SparkRunner (which only support ProcessingTime triggers based on
> the capability-matrix), please find below a JUnit class i am using to
> validate this behavior.
>
> <code>
> public class SessionWindowTest {
>     private long TIME = System.currentTimeMillis();
>
>     @Test
>     public void testSessionWindowWorkAsExpected() {
>         final List<String> testMesages = new LinkedList<>();
>         TIME = System.currentTimeMillis();
>
>         //
>         // 3 Windows of data
>         //
>
>         IntStream.range(0, 10).forEach(i -> {
>             testMesages.add("{\"user_id\":123456789,
> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
>             TIME += TimeUnit.SECONDS.toMillis(1);
>         });
>         TIME += TimeUnit.MINUTES.toMillis( 5);
>         IntStream.range(0, 10).forEach(i -> {
>             testMesages.add("{\"user_id\":123456789,
> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
>             TIME += TimeUnit.SECONDS.toMillis(2);
>         });
>         TIME += TimeUnit.MINUTES.toMillis(6);
>         IntStream.range(0, 10).forEach(i -> {
>             testMesages.add("{\"user_id\":123456789,
> \"event_type\":\"watch_tutorial\", \"time\":" + (TIME) + "}");
>             TIME += TimeUnit.SECONDS.toMillis(4);
>         });
>
>         Pipeline pipe = Pipeline.create();
>         PCollection<String> input = pipe.apply("Create",
> Create.of(testMesages));
>
>         PCollection<KV<Long, Iterable<String>>> windowedResult =
> input.apply("setting the time",
>                 ParDo.of(new DoFn<String, String>() {
>                     @ProcessElement
>                     public void processElement(ProcessContext c) {
>                         Any deserialize =
> JsonIterator.deserialize(c.element());
>                         c.outputWithTimestamp(c.element(), new
> Instant(deserialize.get("time").toLong()));
>                     }
>                 }))
>                 .apply("Session",
>
> Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(2)))
> //
> .withAllowedLateness(Duration.standardSeconds(1))
> //                                .discardingFiredPanes()
> //
> .triggering(AfterProcessingTime.pastFirstElementInPane())
>                 )
>                 .apply("Create KV of Users", ParDo.of(new CreateUserKV()))
>                 .apply(GroupByKey.create())
>                 .apply(ParDo.of(new DoFn<KV<Long, Iterable<String>>,
> KV<Long, Iterable<String>>>() {
>                     private int counter = 0;
>
>                     @StartBundle
>                     public void startBundle() {
>                         System.out.println("--BUNDLE--");
>                     }
>
>                     @Setup
>                     public void setupFn() {
>                         System.out.println("--SETUP--");
>                     }
>                     @ProcessElement
>                     public void processElement(ProcessContext c,
> BoundedWindow _window) {
>                         System.out.println("-- window:" + _window);
>                         System.out.println("session:" +
> UUID.randomUUID().toString());
>                         //System.out.println(c.element().getValue());
>                         System.out.println("--");
>                         c.output(c.element());
>                     }
>                 }));
>
>         PipelineResult run = pipe.run();
>         Assert.assertTrue("Pipeline is done", run.getState() ==
> PipelineResult.State.DONE);
>     }
>
>     static class CreateMyKV extends DoFn<String, KV<Long, String>> {
>         @ProcessElement
>         public void processElement(ProcessContext c) {
>             Any deserialize = JsonIterator.deserialize(c.element());
>             Map<String, Any> stringAnyMap = deserialize.asMap();
>             c.output(KV.of(stringAnyMap.get("user_id").toLong(),
> c.element()));
>         }
>     }
> }
> </code>
>
> --
>
> JC
>
>