You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mohil Khare <mo...@prosimo.io> on 2020/08/06 07:42:55 UTC

Session Windowing followed by CoGroupByKey

Hello All,

I need to seek advice whether Session Windowing followed by CoGroupByKey is
a correct way to solve my use case or not and if YES, then whether I am
using it correctly or not.
Please note that I am using java sdk 2.19 on google dataflow

I have two streams of data coming from two different kafka topics and I
need to correlate them using the common key present in both of them. I
expect all the logs for a key to arrive within 90 seconds in both topics
and hence I decided to use session window

1. Read data from kafka topic like following:

PCollection<KV<MyKey, POJO1>> collection1 =
    p

    .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()

        .withBootstrapServers(servers)

        .withTopics(Arrays.asList(“topic1”))

        .withKeyDeserializer(StringDeserializer.class)

        .withValueDeserializer(ByteArrayDeserializer.class)

        .withConsumerConfigUpdates(kafkaConsumerProperties)

        .withConsumerFactoryFn(consumerFactoryObj)

        .commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))

    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

    .withAllowedLateness(Duration.standardSeconds(360))

    .discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection<POJO>",

    ParDo.of(new ParseLogsPOJO1())));


PCollection<KV<MyKey, POJO2>> collection2 =
    p

    .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()

        .withBootstrapServers(servers)

        .withTopics(Arrays.asList(“topic2”))

        .withKeyDeserializer(StringDeserializer.class)

        .withValueDeserializer(ByteArrayDeserializer.class)

        .withConsumerConfigUpdates(kafkaConsumerProperties)

        .withConsumerFactoryFn(consumerFactoryObj)

        .commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))

    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

    .withAllowedLateness(Duration.standardSeconds(360))

    .discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection<POJO>",

    ParDo.of(new ParseLogsPOJO2())));


2. Put each of the above collections in a session window with gap period 90
secs


   PCollection<KV<MyKey, POJO1>> sessionWindowedPOJO1 =

    Collection1

        .apply("Applying_Session_Window",

            Window.<KV<MyKey,
POJO1>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))


.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))

                .withAllowedLateness(Duration.ZERO).discardingFiredPanes());


     PCollection<KV<MyKey, POJO2>> sessionWindowedPOJO2 =

    Collection1

        .apply("Applying_Session_Window",

            Window.<KV<MyKey,
POJO2>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))


.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))

                .withAllowedLateness(Duration.ZERO).discardingFiredPanes());


3. CoGroupByKey and get correlated logs


   PCollection<CorrelatedPOJO> coGbkLogs =

    KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1)

        .and(“tag2”, sessionWindowedPOJO2)

        .apply("CoGroupByMyKey”, CoGroupByKey.create())

        .apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate())




   Is this a correct way to solve my use case?


Looking forward to hearing from someone soon.


Thanks and Regards

Mohil

Re: Session Windowing followed by CoGroupByKey

Posted by Mohil Khare <mo...@prosimo.io>.
Hello,

My ParDos in KafkaREAD stage is pretty much keeping with the timestamp of
logs and able to read them real time without any lag,
However, it seems sometimes it takes upto 3-4 mins for data to show up in
ParDo(new Correlate()) after SessionWindow->followed by CoGroupByKey.
I am definitely doing something wrong. Any suggestions?

Thanks and regards
Mohil



On Thu, Aug 6, 2020 at 12:42 AM Mohil Khare <mo...@prosimo.io> wrote:

> Hello All,
>
> I need to seek advice whether Session Windowing followed by CoGroupByKey
> is a correct way to solve my use case or not and if YES, then whether I am
> using it correctly or not.
> Please note that I am using java sdk 2.19 on google dataflow
>
> I have two streams of data coming from two different kafka topics and I
> need to correlate them using the common key present in both of them. I
> expect all the logs for a key to arrive within 90 seconds in both topics
> and hence I decided to use session window
>
> 1. Read data from kafka topic like following:
>
> PCollection<KV<MyKey, POJO1>> collection1 =
>     p
>
>     .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
>
>         .withBootstrapServers(servers)
>
>         .withTopics(Arrays.asList(“topic1”))
>
>         .withKeyDeserializer(StringDeserializer.class)
>
>         .withValueDeserializer(ByteArrayDeserializer.class)
>
>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>
>         .withConsumerFactoryFn(consumerFactoryObj)
>
>         .commitOffsetsInFinalize())
>
> .apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>
>     .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>
>     .withAllowedLateness(Duration.standardSeconds(360))
>
>     .discardingFiredPanes())
>
> .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>
>     ParDo.of(new ParseLogsPOJO1())));
>
>
> PCollection<KV<MyKey, POJO2>> collection2 =
>     p
>
>     .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
>
>         .withBootstrapServers(servers)
>
>         .withTopics(Arrays.asList(“topic2”))
>
>         .withKeyDeserializer(StringDeserializer.class)
>
>         .withValueDeserializer(ByteArrayDeserializer.class)
>
>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>
>         .withConsumerFactoryFn(consumerFactoryObj)
>
>         .commitOffsetsInFinalize())
>
> .apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>
>     .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>
>     .withAllowedLateness(Duration.standardSeconds(360))
>
>     .discardingFiredPanes())
>
> .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>
>     ParDo.of(new ParseLogsPOJO2())));
>
>
> 2. Put each of the above collections in a session window with gap period
> 90 secs
>
>
>    PCollection<KV<MyKey, POJO1>> sessionWindowedPOJO1 =
>
>     Collection1
>
>         .apply("Applying_Session_Window",
>
>             Window.<KV<MyKey,
> POJO1>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))
>
>
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
>
>
> .withAllowedLateness(Duration.ZERO).discardingFiredPanes());
>
>
>      PCollection<KV<MyKey, POJO2>> sessionWindowedPOJO2 =
>
>     Collection1
>
>         .apply("Applying_Session_Window",
>
>             Window.<KV<MyKey,
> POJO2>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))
>
>
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
>
>
> .withAllowedLateness(Duration.ZERO).discardingFiredPanes());
>
>
> 3. CoGroupByKey and get correlated logs
>
>
>    PCollection<CorrelatedPOJO> coGbkLogs =
>
>     KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1)
>
>         .and(“tag2”, sessionWindowedPOJO2)
>
>         .apply("CoGroupByMyKey”, CoGroupByKey.create())
>
>         .apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate())
>
>
>
>
>    Is this a correct way to solve my use case?
>
>
> Looking forward to hearing from someone soon.
>
>
> Thanks and Regards
>
> Mohil
>