You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mohil Khare <mo...@prosimo.io> on 2020/08/06 07:42:55 UTC
Session Windowing followed by CoGroupByKey
Hello All,
I need to seek advice whether Session Windowing followed by CoGroupByKey is
a correct way to solve my use case or not and if YES, then whether I am
using it correctly or not.
Please note that I am using java sdk 2.19 on google dataflow
I have two streams of data coming from two different kafka topics and I
need to correlate them using the common key present in both of them. I
expect all the logs for a key to arrive within 90 seconds in both topics
and hence I decided to use session window
1. Read data from kafka topic like following:
PCollection<KV<MyKey, POJO1>> collection1 =
p
.apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
.withBootstrapServers(servers)
.withTopics(Arrays.asList(“topic1”))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardSeconds(360))
.discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection<POJO>",
ParDo.of(new ParseLogsPOJO1())));
PCollection<KV<MyKey, POJO2>> collection2 =
p
.apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
.withBootstrapServers(servers)
.withTopics(Arrays.asList(“topic2”))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardSeconds(360))
.discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection<POJO>",
ParDo.of(new ParseLogsPOJO2())));
2. Put each of the above collections in a session window with gap period 90
secs
PCollection<KV<MyKey, POJO1>> sessionWindowedPOJO1 =
Collection1
.apply("Applying_Session_Window",
Window.<KV<MyKey,
POJO1>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.ZERO).discardingFiredPanes());
PCollection<KV<MyKey, POJO2>> sessionWindowedPOJO2 =
Collection1
.apply("Applying_Session_Window",
Window.<KV<MyKey,
POJO2>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.ZERO).discardingFiredPanes());
3. CoGroupByKey and get correlated logs
PCollection<CorrelatedPOJO> coGbkLogs =
KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1)
.and(“tag2”, sessionWindowedPOJO2)
.apply("CoGroupByMyKey”, CoGroupByKey.create())
.apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate())
Is this a correct way to solve my use case?
Looking forward to hearing from someone soon.
Thanks and Regards
Mohil
Re: Session Windowing followed by CoGroupByKey
Posted by Mohil Khare <mo...@prosimo.io>.
Hello,
My ParDos in KafkaREAD stage is pretty much keeping with the timestamp of
logs and able to read them real time without any lag,
However, it seems sometimes it takes upto 3-4 mins for data to show up in
ParDo(new Correlate()) after SessionWindow->followed by CoGroupByKey.
I am definitely doing something wrong. Any suggestions?
Thanks and regards
Mohil
On Thu, Aug 6, 2020 at 12:42 AM Mohil Khare <mo...@prosimo.io> wrote:
> Hello All,
>
> I need to seek advice whether Session Windowing followed by CoGroupByKey
> is a correct way to solve my use case or not and if YES, then whether I am
> using it correctly or not.
> Please note that I am using java sdk 2.19 on google dataflow
>
> I have two streams of data coming from two different kafka topics and I
> need to correlate them using the common key present in both of them. I
> expect all the logs for a key to arrive within 90 seconds in both topics
> and hence I decided to use session window
>
> 1. Read data from kafka topic like following:
>
> PCollection<KV<MyKey, POJO1>> collection1 =
> p
>
> .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
>
> .withBootstrapServers(servers)
>
> .withTopics(Arrays.asList(“topic1”))
>
> .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializer(ByteArrayDeserializer.class)
>
> .withConsumerConfigUpdates(kafkaConsumerProperties)
>
> .withConsumerFactoryFn(consumerFactoryObj)
>
> .commitOffsetsInFinalize())
>
> .apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>
> .withAllowedLateness(Duration.standardSeconds(360))
>
> .discardingFiredPanes())
>
> .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>
> ParDo.of(new ParseLogsPOJO1())));
>
>
> PCollection<KV<MyKey, POJO2>> collection2 =
> p
>
> .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
>
> .withBootstrapServers(servers)
>
> .withTopics(Arrays.asList(“topic2”))
>
> .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializer(ByteArrayDeserializer.class)
>
> .withConsumerConfigUpdates(kafkaConsumerProperties)
>
> .withConsumerFactoryFn(consumerFactoryObj)
>
> .commitOffsetsInFinalize())
>
> .apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>
> .withAllowedLateness(Duration.standardSeconds(360))
>
> .discardingFiredPanes())
>
> .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>
> ParDo.of(new ParseLogsPOJO2())));
>
>
> 2. Put each of the above collections in a session window with gap period
> 90 secs
>
>
> PCollection<KV<MyKey, POJO1>> sessionWindowedPOJO1 =
>
> Collection1
>
> .apply("Applying_Session_Window",
>
> Window.<KV<MyKey,
> POJO1>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))
>
>
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
>
>
> .withAllowedLateness(Duration.ZERO).discardingFiredPanes());
>
>
> PCollection<KV<MyKey, POJO2>> sessionWindowedPOJO2 =
>
> Collection1
>
> .apply("Applying_Session_Window",
>
> Window.<KV<MyKey,
> POJO2>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))
>
>
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
>
>
> .withAllowedLateness(Duration.ZERO).discardingFiredPanes());
>
>
> 3. CoGroupByKey and get correlated logs
>
>
> PCollection<CorrelatedPOJO> coGbkLogs =
>
> KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1)
>
> .and(“tag2”, sessionWindowedPOJO2)
>
> .apply("CoGroupByMyKey”, CoGroupByKey.create())
>
> .apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate())
>
>
>
>
> Is this a correct way to solve my use case?
>
>
> Looking forward to hearing from someone soon.
>
>
> Thanks and Regards
>
> Mohil
>