You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sigalit Eliazov <e....@gmail.com> on 2023/11/16 22:36:28 UTC

Pipeline Stalls at GroupByKey Step

Hi,

In our pipeline, we've encountered an issue with the GroupByKey step. After
some time of running, it seems that messages are not progressing through
the GroupByKey step, causing the pipeline to stall in data processing.
To troubleshoot this issue, we added debug logging before and after the
GroupByKey step.

We are using Beam version 2.50 with Flink 1.16.

running with only 1 task manager, 2 slots. parallelism 2. no HA.

any insights or suggestions?
The messages are KV - of String and an Avro message.

PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput = pipeline
    .apply("readIpSessionInfo", KafkaTransform.readAvroMessageFromKafka(
        pipelineUtil.getBootstrapServers(),
        options.getSourceKafkaTopic(),
        PIPELINE_NAME,
        IpSessionInfo.class,
        IpSessionInfoDeserializer.class))

 .apply("ConvertIpSessionFromKafka", ParDo.of(new
ConvertFromKafkaRecord<>()))

 .apply(Window.<KV<String, IpSessionInfo>>into(new GlobalWindows())
        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes())

.apply("DebugLogBeforeGroupByKey", ParDo.of(new DoFn<KV<String,
IpSessionInfo>, KV<String, IpSessionInfo>>() {
        @DoFn.ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, IpSessionInfo> element = c.element();
            log.atInfo().log("Before GroupByKey: " + element);
            c.output(element);
        }
    }))

*    .apply(GroupByKey.create())*
 .apply("DebugLogAfterGroupByKey", ParDo.of(new DoFn<KV<String,
Iterable<IpSessionInfo>>, KV<String, Iterable<IpSessionInfo>>>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, Iterable<IpSessionInfo>> groupedElement =
c.element();
            log.atInfo().log("After GroupByKey: " + groupedElement);
            c.output(groupedElement);
        }
    }))
    .apply(ParDo.of(new ConvertIpSession()));


thanks
Sigalit

Re: Pipeline Stalls at GroupByKey Step

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Sigalit,

you should set TimestampPolicyFactory [1] to the source, because 
otherwise resetting the timestamp in a plain ParDo 
(ConvertFromKafkaRecord) can cause the element's timestamp to shift back 
in time before watermark and subsequently cause the data to get dropped 
by the GroupByKey transform. If you don't set any watermark policy 
explicitly, the default is processing time, which is likely causing the 
effect you observe. Alternative option is to use state and timers in the 
ConvertFromKafkaRecord transform to make sure that the transform 
correctly holds the output watermark using Timer.withOutputTimestamp 
[2]. I'd go with option 1) though.

Best,

  Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-
[2] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/state/Timer.html#withOutputTimestamp-org.joda.time.Instant-

On 11/17/23 08:22, Sigalit Eliazov wrote:
> yes, the output of ConvertFromKafkaRecord is with timestamp
> KafkaRecord<String,T> record = c.element();
> KV<String,T> entry =Objects.requireNonNull(record).getKV();
> String key = convertKey(entry.getKey());
> T value = convertValue(entry.getValue());
> c.outputWithTimestamp(KV.of(key,value),Instant.now()
> thanks
> Sigalit
>
> On Fri, Nov 17, 2023 at 4:36 AM Sachin Mittal <sj...@gmail.com> wrote:
>
>     Do you add time stamp to every record you output in
>     ConvertFromKafkaRecord step or any step before that.
>
>     On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov
>     <e....@gmail.com> wrote:
>
>         Hi,
>
>         In our pipeline, we've encountered an issue with the
>         |GroupByKey| step. After some time of running, it seems that
>         messages are not progressing through the |GroupByKey| step,
>         causing the pipeline to stall in data processing.
>
>         To troubleshoot this issue, we added debug logging before and
>         after the |GroupByKey|step.
>
>         We are using Beam version 2.50 with Flink 1.16.
>
>         running with only 1 task manager, 2 slots. parallelism 2. no HA.
>
>         any insights or suggestions?
>
>         The messages are KV - of String and an Avro message.
>
>         PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput =
>         pipeline
>             .apply("readIpSessionInfo",
>         KafkaTransform.readAvroMessageFromKafka(
>                 pipelineUtil.getBootstrapServers(),
>                 options.getSourceKafkaTopic(),
>                 PIPELINE_NAME,
>                 IpSessionInfo.class,
>                 IpSessionInfoDeserializer.class))
>          .apply("ConvertIpSessionFromKafka", ParDo.of(new
>         ConvertFromKafkaRecord<>()))
>          .apply(Window.<KV<String, IpSessionInfo>>into(new
>         GlobalWindows())
>         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>                 .withAllowedLateness(Duration.ZERO)
>                 .discardingFiredPanes())
>         .apply("DebugLogBeforeGroupByKey", ParDo.of(new
>         DoFn<KV<String, IpSessionInfo>, KV<String, IpSessionInfo>>() {
>                 @DoFn.ProcessElement
>                 public void processElement(ProcessContext c) {
>                     KV<String, IpSessionInfo> element = c.element();
>                     log.atInfo().log("Before GroupByKey: " + element);
>                     c.output(element);
>                 }
>             }))
>         *    .apply(GroupByKey.create())
>         *
>          .apply("DebugLogAfterGroupByKey", ParDo.of(new
>         DoFn<KV<String, Iterable<IpSessionInfo>>, KV<String,
>         Iterable<IpSessionInfo>>>() {
>                 @ProcessElement
>                 public void processElement(ProcessContext c) {
>                     KV<String, Iterable<IpSessionInfo>> groupedElement
>         = c.element();
>                     log.atInfo().log("After GroupByKey: " +
>         groupedElement);
>                     c.output(groupedElement);
>                 }
>             }))
>             .apply(ParDo.of(new ConvertIpSession()));
>
>
>         thanks
>         Sigalit
>

Re: Pipeline Stalls at GroupByKey Step

Posted by Sigalit Eliazov <e....@gmail.com>.
yes, the output of ConvertFromKafkaRecord is with timestamp

KafkaRecord<String, T> record = c.element();
KV<String, T> entry = Objects.requireNonNull(record).getKV();
String key = convertKey(entry.getKey());
T value = convertValue(entry.getValue());
c.outputWithTimestamp(KV.of(key, value), Instant.now()


thanks

Sigalit


On Fri, Nov 17, 2023 at 4:36 AM Sachin Mittal <sj...@gmail.com> wrote:

> Do you add time stamp to every record you output in
> ConvertFromKafkaRecord step or any step before that.
>
> On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov <e....@gmail.com>
> wrote:
>
>> Hi,
>>
>> In our pipeline, we've encountered an issue with the GroupByKey step.
>> After some time of running, it seems that messages are not progressing
>> through the GroupByKey step, causing the pipeline to stall in data
>> processing.
>> To troubleshoot this issue, we added debug logging before and after the
>> GroupByKey step.
>>
>> We are using Beam version 2.50 with Flink 1.16.
>>
>> running with only 1 task manager, 2 slots. parallelism 2. no HA.
>>
>> any insights or suggestions?
>> The messages are KV - of String and an Avro message.
>>
>> PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput = pipeline
>>     .apply("readIpSessionInfo", KafkaTransform.readAvroMessageFromKafka(
>>         pipelineUtil.getBootstrapServers(),
>>         options.getSourceKafkaTopic(),
>>         PIPELINE_NAME,
>>         IpSessionInfo.class,
>>         IpSessionInfoDeserializer.class))
>>
>>  .apply("ConvertIpSessionFromKafka", ParDo.of(new
>> ConvertFromKafkaRecord<>()))
>>
>>  .apply(Window.<KV<String, IpSessionInfo>>into(new GlobalWindows())
>>         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>>         .withAllowedLateness(Duration.ZERO)
>>         .discardingFiredPanes())
>>
>> .apply("DebugLogBeforeGroupByKey", ParDo.of(new DoFn<KV<String,
>> IpSessionInfo>, KV<String, IpSessionInfo>>() {
>>         @DoFn.ProcessElement
>>         public void processElement(ProcessContext c) {
>>             KV<String, IpSessionInfo> element = c.element();
>>             log.atInfo().log("Before GroupByKey: " + element);
>>             c.output(element);
>>         }
>>     }))
>>
>> *    .apply(GroupByKey.create())*
>>  .apply("DebugLogAfterGroupByKey", ParDo.of(new DoFn<KV<String,
>> Iterable<IpSessionInfo>>, KV<String, Iterable<IpSessionInfo>>>() {
>>         @ProcessElement
>>         public void processElement(ProcessContext c) {
>>             KV<String, Iterable<IpSessionInfo>> groupedElement =
>> c.element();
>>             log.atInfo().log("After GroupByKey: " + groupedElement);
>>             c.output(groupedElement);
>>         }
>>     }))
>>     .apply(ParDo.of(new ConvertIpSession()));
>>
>>
>> thanks
>> Sigalit
>>
>

Re: Pipeline Stalls at GroupByKey Step

Posted by Sachin Mittal <sj...@gmail.com>.
Do you add time stamp to every record you output in
ConvertFromKafkaRecord step or any step before that.

On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov <e....@gmail.com> wrote:

> Hi,
>
> In our pipeline, we've encountered an issue with the GroupByKey step.
> After some time of running, it seems that messages are not progressing
> through the GroupByKey step, causing the pipeline to stall in data
> processing.
> To troubleshoot this issue, we added debug logging before and after the
> GroupByKey step.
>
> We are using Beam version 2.50 with Flink 1.16.
>
> running with only 1 task manager, 2 slots. parallelism 2. no HA.
>
> any insights or suggestions?
> The messages are KV - of String and an Avro message.
>
> PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput = pipeline
>     .apply("readIpSessionInfo", KafkaTransform.readAvroMessageFromKafka(
>         pipelineUtil.getBootstrapServers(),
>         options.getSourceKafkaTopic(),
>         PIPELINE_NAME,
>         IpSessionInfo.class,
>         IpSessionInfoDeserializer.class))
>
>  .apply("ConvertIpSessionFromKafka", ParDo.of(new
> ConvertFromKafkaRecord<>()))
>
>  .apply(Window.<KV<String, IpSessionInfo>>into(new GlobalWindows())
>         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>         .withAllowedLateness(Duration.ZERO)
>         .discardingFiredPanes())
>
> .apply("DebugLogBeforeGroupByKey", ParDo.of(new DoFn<KV<String,
> IpSessionInfo>, KV<String, IpSessionInfo>>() {
>         @DoFn.ProcessElement
>         public void processElement(ProcessContext c) {
>             KV<String, IpSessionInfo> element = c.element();
>             log.atInfo().log("Before GroupByKey: " + element);
>             c.output(element);
>         }
>     }))
>
> *    .apply(GroupByKey.create())*
>  .apply("DebugLogAfterGroupByKey", ParDo.of(new DoFn<KV<String,
> Iterable<IpSessionInfo>>, KV<String, Iterable<IpSessionInfo>>>() {
>         @ProcessElement
>         public void processElement(ProcessContext c) {
>             KV<String, Iterable<IpSessionInfo>> groupedElement =
> c.element();
>             log.atInfo().log("After GroupByKey: " + groupedElement);
>             c.output(groupedElement);
>         }
>     }))
>     .apply(ParDo.of(new ConvertIpSession()));
>
>
> thanks
> Sigalit
>