You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Luke Cwik <lc...@google.com> on 2020/06/01 22:38:35 UTC

Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

Your allowed lateness is 360 days and since the trigger you have doesn't
emit speculative results, you'll have to wait till the watermark advances
to the end of windows timestamp + 360 days before something is output from
the grouping aggregation/available at the side input.


On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote:

> Hello all,
>
> Any suggestions? Where am I going wrong or is there any better way of
> achieving this so that I can do replay as well ?
>
> Thanks
> Mohil
>
> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hi everyone,
>> I need a suggestion regarding usage of the side input pattern and sliding
>> window, especially while replaying old kafka logs/offsets.
>>
>> FYI: I am running beam 2.19 on google dataflow.
>>
>> I have a use case where I read a continuous stream of data from Kafka and
>> need to calculate one score (apart from other calculations) per key  which
>> is based on the number of such requests that are received per key in the
>> last one hour.
>>
>> Roughly my code looks like following:
>>
>> PCollection<POJO> = p
>>     .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
>>         .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>>         .withTopic("app_access_stats")
>>         .withKeyDeserializer(StringDeserializer.class)
>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>>         .withConsumerFactoryFn(consumerFactoryObj)
>>         .commitOffsetsInFinalize())
>>     .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>         .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>>         .withAllowedLateness(Duration.standardDays(380))
>>         .discardingFiredPanes())
>>     .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>>         ParDo.of(new ParseKafkaLogs()));
>>
>>
>> /*** Class that handles incoming PCollection<POJO> and calculate score ***/
>>
>> /**. Assume    input = incoming PCollection<POJO> as created above
>>
>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView
>>
>>    = input.apply("Calculate_Total_UserRequests_Past_1Hr", new WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>
>> /**Calculate Running sum of num of reqs in sliding window
>>
>>     Starting sliding window of duration 1 hr every 1 sec so that we can get accurate result of past 1 hr
>>
>> **/
>>
>>
>> private static class WindowedNumUserRequestsPerKey extends PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> {
>>
>>     @Override
>>     public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) {
>>
>>         return input
>>             .apply("Applying_Sliding_Window_1Hr_Every1sec", Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>                 .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>             .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
>>             .apply("Total_Requests_Per_Key", Combine.perKey(new CalculateTotalUserRequestsPerKey()));
>>     }
>>
>>     private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, POJO>> {
>>         @ProcessElement
>>         public void processElement(@Element POJO input, OutputReceiver<KV<KEY, POJO>> out) {
>>             /** code that emits required KV ****/
>>
>>         }
>>     }
>>
>>     private static class CalculateTotalUserRequestsPerKey extends Combine.CombineFn<POJO, CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>>         private static class TotalRequestsAccumulator implements Serializable {
>>             private long num_requests_running_sum = 0;
>>
>>             TotalRequestsAccumulator(long num_requests_running_sum) {
>>                 this.num_requests_running_sum = num_requests_running_sum;
>>             }
>>
>>             @Override
>>             public boolean equals(Object o) {
>>                 if (this == o) return true;
>>                 if (!(o instanceof TotalRequestsAccumulator)) return false;
>>                 TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
>>                 return num_requests_running_sum == that.num_requests_running_sum;
>>             }
>>
>>             @Override
>>             public int hashCode() {
>>                 return Objects.hash(num_requests_running_sum);
>>             }
>>         }
>>
>>         @Override
>>         public TotalRequestsAccumulator createAccumulator() {
>>             return new TotalRequestsAccumulator(0);
>>         }
>>
>>         @Override
>>         public TotalRequestsAccumulator addInput(TotalRequestsAccumulator mutableAccumulator, POJO input) {
>>             mutableAccumulator.num_requests_running_sum++;
>>             return mutableAccumulator;
>>         }
>>
>>         @Override
>>         public TotalRequestsAccumulator mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) {
>>             TotalRequestsAccumulator merged = createAccumulator();
>>             for (TotalRequestsAccumulator accumulator : accumulators) {
>>                 merged.num_requests_running_sum += accumulator.num_requests_running_sum;
>>             }
>>             return merged;
>>         }
>>
>>         @Override
>>         public Long extractOutput(TotalRequestsAccumulator accumulator) {
>>             Long totalUserRequestsPerKey = accumulator.num_requests_running_sum;
>>             return totalUserRequestsPerKey;
>>         }
>>     }
>> }
>>
>> Now I calculate the score in the incoming POJO by using
>> slidingWindowHourlyUserRequestsPerKeyView as side input.
>>
>> input.apply("Add_Score", ParDo.of(new AddScore())
>>     .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));
>>
>>
>> Above seems to be working fine, though I need a suggestion if there is a
>> better way of achieving this?
>>
>> Also, I start getting problems when we have to stop the beam for a couple
>> of hours for maintenance or some other issue while data is continuously
>> being pumped in kafka.
>>
>> Problem:
>> When the beam resumes after a couple of hours, suddenly the above sliding
>> window gets bombarded with log messages and instead of honoring log's
>> timestamp, it just treats all the log messages received in the beam's
>> sliding window, thereby giving the wrong score. For eg, if the beam was
>> stopped  between 9 am and 11 am and there was 20 msgs between 9-10 am and
>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total
>> 50 msgs received in the last one hour as side input while processing all
>> log messages between 9am and 11 am.
>>
>> To overcome this, I tried a few things , but every approach failed:
>>
>> *1. In the transformation which read message from kafka and create
>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't
>> work as I believe you can't output data with older timestamp in a live
>> window while reading real time data stream from kafka.*
>>
>> *2. I thought, since the stage that add score is not honoring event's
>> timestamp (as evident by printing window.startTime in DoFn), I added custom
>> timestamp policy while reading logs from kafka i.e something like this:*
>> *    KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) ->
>> new CustomFieldTimePolicy(previousWatermark))*
>>
>> *where CustomFieldTimePolicy set time record timestamp based on received
>> record's timestamp.*
>> *  On doing this, through the window.startTime printed a somewhat
>> accurate time which was close to even't timestamp, however,
>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It
>> just stalled. My print statements were showing up in
>> aforementioned GroupByAggregationKey(), but then no output was emitted as
>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's
>> stackdriver indicating the reason for the stalled pipeline.*
>>
>> *Any help/suggestion for solving this case. This will be very useful in
>> our replay jobs where for some reason our data sink such as elastic search
>> gets corrupted and we want to read again all the old kafka offsets and
>> recreate the data in the new ES cluster.*
>>
>>
>> Thanks and Regards
>> Mohil
>>
>>
>>
>>

Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

Posted by Mohil Khare <mo...@prosimo.io>.
Cool ..Thanks again for you help/suggestions Luke.

Regards
Mohil

On Tue, Jun 2, 2020 at 10:42 AM Luke Cwik <lc...@google.com> wrote:

> Using side inputs is fine and is a common pattern. You should take a look
> at "slowly changing side inputs"[1] as there is some example code there.
>
> 1:
> https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing
>
> On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Thanks Luke for your reply.
>> I see. I am trying to recall why I added allowedLateness as 360 days.
>> Anyways I will try without that.
>>
>> But do you think the approach I am using to keep getting a running score
>> in a sliding window and then using it as a side input to decorate the main
>> log  is correct ? Or I can achieve same thing is a much better and
>> optimized way.
>>
>> Thanks again
>> Mohil
>>
>> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> Your allowed lateness is 360 days and since the trigger you have doesn't
>>> emit speculative results, you'll have to wait till the watermark advances
>>> to the end of windows timestamp + 360 days before something is output from
>>> the grouping aggregation/available at the side input.
>>>
>>>
>>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hello all,
>>>>
>>>> Any suggestions? Where am I going wrong or is there any better way of
>>>> achieving this so that I can do replay as well ?
>>>>
>>>> Thanks
>>>> Mohil
>>>>
>>>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hi everyone,
>>>>> I need a suggestion regarding usage of the side input pattern and
>>>>> sliding window, especially while replaying old kafka logs/offsets.
>>>>>
>>>>> FYI: I am running beam 2.19 on google dataflow.
>>>>>
>>>>> I have a use case where I read a continuous stream of data from Kafka
>>>>> and need to calculate one score (apart from other calculations) per key
>>>>> which is based on the number of such requests that are received per key in
>>>>> the last one hour.
>>>>>
>>>>> Roughly my code looks like following:
>>>>>
>>>>> PCollection<POJO> = p
>>>>>     .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
>>>>>         .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>>>>>         .withTopic("app_access_stats")
>>>>>         .withKeyDeserializer(StringDeserializer.class)
>>>>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>>>>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>>>>>         .withConsumerFactoryFn(consumerFactoryObj)
>>>>>         .commitOffsetsInFinalize())
>>>>>     .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>>         .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>>>>>         .withAllowedLateness(Duration.standardDays(380))
>>>>>         .discardingFiredPanes())
>>>>>     .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>>>>>         ParDo.of(new ParseKafkaLogs()));
>>>>>
>>>>>
>>>>> /*** Class that handles incoming PCollection<POJO> and calculate score ***/
>>>>>
>>>>> /**. Assume    input = incoming PCollection<POJO> as created above
>>>>>
>>>>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView
>>>>>
>>>>>    = input.apply("Calculate_Total_UserRequests_Past_1Hr", new WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>>>>
>>>>> /**Calculate Running sum of num of reqs in sliding window
>>>>>
>>>>>     Starting sliding window of duration 1 hr every 1 sec so that we can get accurate result of past 1 hr
>>>>>
>>>>> **/
>>>>>
>>>>>
>>>>> private static class WindowedNumUserRequestsPerKey extends PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> {
>>>>>
>>>>>     @Override
>>>>>     public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) {
>>>>>
>>>>>         return input
>>>>>             .apply("Applying_Sliding_Window_1Hr_Every1sec", Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>>>>                 .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>>>>             .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
>>>>>             .apply("Total_Requests_Per_Key", Combine.perKey(new CalculateTotalUserRequestsPerKey()));
>>>>>     }
>>>>>
>>>>>     private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, POJO>> {
>>>>>         @ProcessElement
>>>>>         public void processElement(@Element POJO input, OutputReceiver<KV<KEY, POJO>> out) {
>>>>>             /** code that emits required KV ****/
>>>>>
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private static class CalculateTotalUserRequestsPerKey extends Combine.CombineFn<POJO, CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>>>>>         private static class TotalRequestsAccumulator implements Serializable {
>>>>>             private long num_requests_running_sum = 0;
>>>>>
>>>>>             TotalRequestsAccumulator(long num_requests_running_sum) {
>>>>>                 this.num_requests_running_sum = num_requests_running_sum;
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public boolean equals(Object o) {
>>>>>                 if (this == o) return true;
>>>>>                 if (!(o instanceof TotalRequestsAccumulator)) return false;
>>>>>                 TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
>>>>>                 return num_requests_running_sum == that.num_requests_running_sum;
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public int hashCode() {
>>>>>                 return Objects.hash(num_requests_running_sum);
>>>>>             }
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public TotalRequestsAccumulator createAccumulator() {
>>>>>             return new TotalRequestsAccumulator(0);
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public TotalRequestsAccumulator addInput(TotalRequestsAccumulator mutableAccumulator, POJO input) {
>>>>>             mutableAccumulator.num_requests_running_sum++;
>>>>>             return mutableAccumulator;
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public TotalRequestsAccumulator mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) {
>>>>>             TotalRequestsAccumulator merged = createAccumulator();
>>>>>             for (TotalRequestsAccumulator accumulator : accumulators) {
>>>>>                 merged.num_requests_running_sum += accumulator.num_requests_running_sum;
>>>>>             }
>>>>>             return merged;
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public Long extractOutput(TotalRequestsAccumulator accumulator) {
>>>>>             Long totalUserRequestsPerKey = accumulator.num_requests_running_sum;
>>>>>             return totalUserRequestsPerKey;
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> Now I calculate the score in the incoming POJO by using
>>>>> slidingWindowHourlyUserRequestsPerKeyView as side input.
>>>>>
>>>>> input.apply("Add_Score", ParDo.of(new AddScore())
>>>>>     .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));
>>>>>
>>>>>
>>>>> Above seems to be working fine, though I need a suggestion if there is
>>>>> a better way of achieving this?
>>>>>
>>>>> Also, I start getting problems when we have to stop the beam for a
>>>>> couple of hours for maintenance or some other issue while data is
>>>>> continuously being pumped in kafka.
>>>>>
>>>>> Problem:
>>>>> When the beam resumes after a couple of hours, suddenly the above
>>>>> sliding window gets bombarded with log messages and instead of honoring
>>>>> log's timestamp, it just treats all the log messages received in the beam's
>>>>> sliding window, thereby giving the wrong score. For eg, if the beam was
>>>>> stopped  between 9 am and 11 am and there was 20 msgs between 9-10 am and
>>>>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total
>>>>> 50 msgs received in the last one hour as side input while processing all
>>>>> log messages between 9am and 11 am.
>>>>>
>>>>> To overcome this, I tried a few things , but every approach failed:
>>>>>
>>>>> *1. In the transformation which read message from kafka and create
>>>>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't
>>>>> work as I believe you can't output data with older timestamp in a live
>>>>> window while reading real time data stream from kafka.*
>>>>>
>>>>> *2. I thought, since the stage that add score is not honoring event's
>>>>> timestamp (as evident by printing window.startTime in DoFn), I added custom
>>>>> timestamp policy while reading logs from kafka i.e something like this:*
>>>>> *    KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark)
>>>>> -> new CustomFieldTimePolicy(previousWatermark))*
>>>>>
>>>>> *where CustomFieldTimePolicy set time record timestamp based on
>>>>> received record's timestamp.*
>>>>> *  On doing this, through the window.startTime printed a somewhat
>>>>> accurate time which was close to even't timestamp, however,
>>>>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It
>>>>> just stalled. My print statements were showing up in
>>>>> aforementioned GroupByAggregationKey(), but then no output was emitted as
>>>>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's
>>>>> stackdriver indicating the reason for the stalled pipeline.*
>>>>>
>>>>> *Any help/suggestion for solving this case. This will be very useful
>>>>> in our replay jobs where for some reason our data sink such as elastic
>>>>> search gets corrupted and we want to read again all the old kafka offsets
>>>>> and recreate the data in the new ES cluster.*
>>>>>
>>>>>
>>>>> Thanks and Regards
>>>>> Mohil
>>>>>
>>>>>
>>>>>
>>>>>

Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

Posted by Luke Cwik <lc...@google.com>.
Using side inputs is fine and is a common pattern. You should take a look
at "slowly changing side inputs"[1] as there is some example code there.

1:
https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing

On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare <mo...@prosimo.io> wrote:

> Thanks Luke for your reply.
> I see. I am trying to recall why I added allowedLateness as 360 days.
> Anyways I will try without that.
>
> But do you think the approach I am using to keep getting a running score
> in a sliding window and then using it as a side input to decorate the main
> log  is correct ? Or I can achieve same thing is a much better and
> optimized way.
>
> Thanks again
> Mohil
>
> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik <lc...@google.com> wrote:
>
>> Your allowed lateness is 360 days and since the trigger you have doesn't
>> emit speculative results, you'll have to wait till the watermark advances
>> to the end of windows timestamp + 360 days before something is output from
>> the grouping aggregation/available at the side input.
>>
>>
>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hello all,
>>>
>>> Any suggestions? Where am I going wrong or is there any better way of
>>> achieving this so that I can do replay as well ?
>>>
>>> Thanks
>>> Mohil
>>>
>>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hi everyone,
>>>> I need a suggestion regarding usage of the side input pattern and
>>>> sliding window, especially while replaying old kafka logs/offsets.
>>>>
>>>> FYI: I am running beam 2.19 on google dataflow.
>>>>
>>>> I have a use case where I read a continuous stream of data from Kafka
>>>> and need to calculate one score (apart from other calculations) per key
>>>> which is based on the number of such requests that are received per key in
>>>> the last one hour.
>>>>
>>>> Roughly my code looks like following:
>>>>
>>>> PCollection<POJO> = p
>>>>     .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
>>>>         .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>>>>         .withTopic("app_access_stats")
>>>>         .withKeyDeserializer(StringDeserializer.class)
>>>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>>>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>>>>         .withConsumerFactoryFn(consumerFactoryObj)
>>>>         .commitOffsetsInFinalize())
>>>>     .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>         .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>>>>         .withAllowedLateness(Duration.standardDays(380))
>>>>         .discardingFiredPanes())
>>>>     .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>>>>         ParDo.of(new ParseKafkaLogs()));
>>>>
>>>>
>>>> /*** Class that handles incoming PCollection<POJO> and calculate score ***/
>>>>
>>>> /**. Assume    input = incoming PCollection<POJO> as created above
>>>>
>>>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView
>>>>
>>>>    = input.apply("Calculate_Total_UserRequests_Past_1Hr", new WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>>>
>>>> /**Calculate Running sum of num of reqs in sliding window
>>>>
>>>>     Starting sliding window of duration 1 hr every 1 sec so that we can get accurate result of past 1 hr
>>>>
>>>> **/
>>>>
>>>>
>>>> private static class WindowedNumUserRequestsPerKey extends PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> {
>>>>
>>>>     @Override
>>>>     public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) {
>>>>
>>>>         return input
>>>>             .apply("Applying_Sliding_Window_1Hr_Every1sec", Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>>>                 .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>>>             .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
>>>>             .apply("Total_Requests_Per_Key", Combine.perKey(new CalculateTotalUserRequestsPerKey()));
>>>>     }
>>>>
>>>>     private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, POJO>> {
>>>>         @ProcessElement
>>>>         public void processElement(@Element POJO input, OutputReceiver<KV<KEY, POJO>> out) {
>>>>             /** code that emits required KV ****/
>>>>
>>>>         }
>>>>     }
>>>>
>>>>     private static class CalculateTotalUserRequestsPerKey extends Combine.CombineFn<POJO, CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>>>>         private static class TotalRequestsAccumulator implements Serializable {
>>>>             private long num_requests_running_sum = 0;
>>>>
>>>>             TotalRequestsAccumulator(long num_requests_running_sum) {
>>>>                 this.num_requests_running_sum = num_requests_running_sum;
>>>>             }
>>>>
>>>>             @Override
>>>>             public boolean equals(Object o) {
>>>>                 if (this == o) return true;
>>>>                 if (!(o instanceof TotalRequestsAccumulator)) return false;
>>>>                 TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
>>>>                 return num_requests_running_sum == that.num_requests_running_sum;
>>>>             }
>>>>
>>>>             @Override
>>>>             public int hashCode() {
>>>>                 return Objects.hash(num_requests_running_sum);
>>>>             }
>>>>         }
>>>>
>>>>         @Override
>>>>         public TotalRequestsAccumulator createAccumulator() {
>>>>             return new TotalRequestsAccumulator(0);
>>>>         }
>>>>
>>>>         @Override
>>>>         public TotalRequestsAccumulator addInput(TotalRequestsAccumulator mutableAccumulator, POJO input) {
>>>>             mutableAccumulator.num_requests_running_sum++;
>>>>             return mutableAccumulator;
>>>>         }
>>>>
>>>>         @Override
>>>>         public TotalRequestsAccumulator mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) {
>>>>             TotalRequestsAccumulator merged = createAccumulator();
>>>>             for (TotalRequestsAccumulator accumulator : accumulators) {
>>>>                 merged.num_requests_running_sum += accumulator.num_requests_running_sum;
>>>>             }
>>>>             return merged;
>>>>         }
>>>>
>>>>         @Override
>>>>         public Long extractOutput(TotalRequestsAccumulator accumulator) {
>>>>             Long totalUserRequestsPerKey = accumulator.num_requests_running_sum;
>>>>             return totalUserRequestsPerKey;
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> Now I calculate the score in the incoming POJO by using
>>>> slidingWindowHourlyUserRequestsPerKeyView as side input.
>>>>
>>>> input.apply("Add_Score", ParDo.of(new AddScore())
>>>>     .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));
>>>>
>>>>
>>>> Above seems to be working fine, though I need a suggestion if there is
>>>> a better way of achieving this?
>>>>
>>>> Also, I start getting problems when we have to stop the beam for a
>>>> couple of hours for maintenance or some other issue while data is
>>>> continuously being pumped in kafka.
>>>>
>>>> Problem:
>>>> When the beam resumes after a couple of hours, suddenly the above
>>>> sliding window gets bombarded with log messages and instead of honoring
>>>> log's timestamp, it just treats all the log messages received in the beam's
>>>> sliding window, thereby giving the wrong score. For eg, if the beam was
>>>> stopped  between 9 am and 11 am and there was 20 msgs between 9-10 am and
>>>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total
>>>> 50 msgs received in the last one hour as side input while processing all
>>>> log messages between 9am and 11 am.
>>>>
>>>> To overcome this, I tried a few things , but every approach failed:
>>>>
>>>> *1. In the transformation which read message from kafka and create
>>>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't
>>>> work as I believe you can't output data with older timestamp in a live
>>>> window while reading real time data stream from kafka.*
>>>>
>>>> *2. I thought, since the stage that add score is not honoring event's
>>>> timestamp (as evident by printing window.startTime in DoFn), I added custom
>>>> timestamp policy while reading logs from kafka i.e something like this:*
>>>> *    KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) ->
>>>> new CustomFieldTimePolicy(previousWatermark))*
>>>>
>>>> *where CustomFieldTimePolicy set time record timestamp based on
>>>> received record's timestamp.*
>>>> *  On doing this, through the window.startTime printed a somewhat
>>>> accurate time which was close to even't timestamp, however,
>>>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It
>>>> just stalled. My print statements were showing up in
>>>> aforementioned GroupByAggregationKey(), but then no output was emitted as
>>>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's
>>>> stackdriver indicating the reason for the stalled pipeline.*
>>>>
>>>> *Any help/suggestion for solving this case. This will be very useful in
>>>> our replay jobs where for some reason our data sink such as elastic search
>>>> gets corrupted and we want to read again all the old kafka offsets and
>>>> recreate the data in the new ES cluster.*
>>>>
>>>>
>>>> Thanks and Regards
>>>> Mohil
>>>>
>>>>
>>>>
>>>>

Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

Posted by Mohil Khare <mo...@prosimo.io>.
Thanks Luke for your reply.
I see. I am trying to recall why I added allowedLateness as 360 days.
Anyways I will try without that.

But do you think the approach I am using to keep getting a running score in
a sliding window and then using it as a side input to decorate the main
log  is correct ? Or I can achieve same thing is a much better and
optimized way.

Thanks again
Mohil

On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik <lc...@google.com> wrote:

> Your allowed lateness is 360 days and since the trigger you have doesn't
> emit speculative results, you'll have to wait till the watermark advances
> to the end of windows timestamp + 360 days before something is output from
> the grouping aggregation/available at the side input.
>
>
> On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hello all,
>>
>> Any suggestions? Where am I going wrong or is there any better way of
>> achieving this so that I can do replay as well ?
>>
>> Thanks
>> Mohil
>>
>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hi everyone,
>>> I need a suggestion regarding usage of the side input pattern and
>>> sliding window, especially while replaying old kafka logs/offsets.
>>>
>>> FYI: I am running beam 2.19 on google dataflow.
>>>
>>> I have a use case where I read a continuous stream of data from Kafka
>>> and need to calculate one score (apart from other calculations) per key
>>> which is based on the number of such requests that are received per key in
>>> the last one hour.
>>>
>>> Roughly my code looks like following:
>>>
>>> PCollection<POJO> = p
>>>     .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
>>>         .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>>>         .withTopic("app_access_stats")
>>>         .withKeyDeserializer(StringDeserializer.class)
>>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>>>         .withConsumerFactoryFn(consumerFactoryObj)
>>>         .commitOffsetsInFinalize())
>>>     .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>         .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>>>         .withAllowedLateness(Duration.standardDays(380))
>>>         .discardingFiredPanes())
>>>     .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>>>         ParDo.of(new ParseKafkaLogs()));
>>>
>>>
>>> /*** Class that handles incoming PCollection<POJO> and calculate score ***/
>>>
>>> /**. Assume    input = incoming PCollection<POJO> as created above
>>>
>>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView
>>>
>>>    = input.apply("Calculate_Total_UserRequests_Past_1Hr", new WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>>
>>> /**Calculate Running sum of num of reqs in sliding window
>>>
>>>     Starting sliding window of duration 1 hr every 1 sec so that we can get accurate result of past 1 hr
>>>
>>> **/
>>>
>>>
>>> private static class WindowedNumUserRequestsPerKey extends PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> {
>>>
>>>     @Override
>>>     public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) {
>>>
>>>         return input
>>>             .apply("Applying_Sliding_Window_1Hr_Every1sec", Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>>                 .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>>             .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
>>>             .apply("Total_Requests_Per_Key", Combine.perKey(new CalculateTotalUserRequestsPerKey()));
>>>     }
>>>
>>>     private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, POJO>> {
>>>         @ProcessElement
>>>         public void processElement(@Element POJO input, OutputReceiver<KV<KEY, POJO>> out) {
>>>             /** code that emits required KV ****/
>>>
>>>         }
>>>     }
>>>
>>>     private static class CalculateTotalUserRequestsPerKey extends Combine.CombineFn<POJO, CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>>>         private static class TotalRequestsAccumulator implements Serializable {
>>>             private long num_requests_running_sum = 0;
>>>
>>>             TotalRequestsAccumulator(long num_requests_running_sum) {
>>>                 this.num_requests_running_sum = num_requests_running_sum;
>>>             }
>>>
>>>             @Override
>>>             public boolean equals(Object o) {
>>>                 if (this == o) return true;
>>>                 if (!(o instanceof TotalRequestsAccumulator)) return false;
>>>                 TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
>>>                 return num_requests_running_sum == that.num_requests_running_sum;
>>>             }
>>>
>>>             @Override
>>>             public int hashCode() {
>>>                 return Objects.hash(num_requests_running_sum);
>>>             }
>>>         }
>>>
>>>         @Override
>>>         public TotalRequestsAccumulator createAccumulator() {
>>>             return new TotalRequestsAccumulator(0);
>>>         }
>>>
>>>         @Override
>>>         public TotalRequestsAccumulator addInput(TotalRequestsAccumulator mutableAccumulator, POJO input) {
>>>             mutableAccumulator.num_requests_running_sum++;
>>>             return mutableAccumulator;
>>>         }
>>>
>>>         @Override
>>>         public TotalRequestsAccumulator mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) {
>>>             TotalRequestsAccumulator merged = createAccumulator();
>>>             for (TotalRequestsAccumulator accumulator : accumulators) {
>>>                 merged.num_requests_running_sum += accumulator.num_requests_running_sum;
>>>             }
>>>             return merged;
>>>         }
>>>
>>>         @Override
>>>         public Long extractOutput(TotalRequestsAccumulator accumulator) {
>>>             Long totalUserRequestsPerKey = accumulator.num_requests_running_sum;
>>>             return totalUserRequestsPerKey;
>>>         }
>>>     }
>>> }
>>>
>>> Now I calculate the score in the incoming POJO by using
>>> slidingWindowHourlyUserRequestsPerKeyView as side input.
>>>
>>> input.apply("Add_Score", ParDo.of(new AddScore())
>>>     .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));
>>>
>>>
>>> Above seems to be working fine, though I need a suggestion if there is a
>>> better way of achieving this?
>>>
>>> Also, I start getting problems when we have to stop the beam for a
>>> couple of hours for maintenance or some other issue while data is
>>> continuously being pumped in kafka.
>>>
>>> Problem:
>>> When the beam resumes after a couple of hours, suddenly the above
>>> sliding window gets bombarded with log messages and instead of honoring
>>> log's timestamp, it just treats all the log messages received in the beam's
>>> sliding window, thereby giving the wrong score. For eg, if the beam was
>>> stopped  between 9 am and 11 am and there was 20 msgs between 9-10 am and
>>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total
>>> 50 msgs received in the last one hour as side input while processing all
>>> log messages between 9am and 11 am.
>>>
>>> To overcome this, I tried a few things , but every approach failed:
>>>
>>> *1. In the transformation which read message from kafka and create
>>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't
>>> work as I believe you can't output data with older timestamp in a live
>>> window while reading real time data stream from kafka.*
>>>
>>> *2. I thought, since the stage that add score is not honoring event's
>>> timestamp (as evident by printing window.startTime in DoFn), I added custom
>>> timestamp policy while reading logs from kafka i.e something like this:*
>>> *    KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) ->
>>> new CustomFieldTimePolicy(previousWatermark))*
>>>
>>> *where CustomFieldTimePolicy set time record timestamp based on received
>>> record's timestamp.*
>>> *  On doing this, through the window.startTime printed a somewhat
>>> accurate time which was close to even't timestamp, however,
>>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It
>>> just stalled. My print statements were showing up in
>>> aforementioned GroupByAggregationKey(), but then no output was emitted as
>>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's
>>> stackdriver indicating the reason for the stalled pipeline.*
>>>
>>> *Any help/suggestion for solving this case. This will be very useful in
>>> our replay jobs where for some reason our data sink such as elastic search
>>> gets corrupted and we want to read again all the old kafka offsets and
>>> recreate the data in the new ES cluster.*
>>>
>>>
>>> Thanks and Regards
>>> Mohil
>>>
>>>
>>>
>>>