You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Rion Williams <ri...@gmail.com> on 2021/02/12 02:48:26 UTC

Pull Request Review / Feature Feedback for KafkaIO

Hi all,

Recently, I encountered a bit of functionality for a pipeline that I was
working that seemed to be slightly lacking (specifically the recognition of
explicitly defined partitioning in the KafkaIO.WriteRecords transform) so I
put together a JIRA related to it [1] as well as a more detailed pull
request [1] with an initial potential fix/change.

I'll provide a bit more context from the pull request description below in
case in-thread feedback would be easier for some, but any
recommendations/reviewers/advice would be greatly appreciated!

Cheers,

Rion

[1]: https://issues.apache.org/jira/browse/BEAM-11806
[2]: https://github.com/apache/beam/pull/13975

----------------------------------

At present, the WriteRecords transform for KafkaIO does not recognize the
partition property defined on ProducerRecord instances consumed by the
transform:

producer.send(
        // The null property in the following constructor represents partition
        new ProducerRecord<>(
            topicName, null, timestampMillis, record.key(),
record.value(), record.headers()),
        new SendCallback());

Because of this limitation, in a scenario where a user may desire an
explicitly defined partitioning strategy as opposed to round-robin, they
would have to create their own custom DoFn that defines a KafkaProducer
(preferably within a @StartBundle) similar to the following approach (in
Kotlin):

private class ExampleProducerDoFn(...): DoFn<...>() {
        private lateinit var producer: KafkaProducer<...>

        @StartBundle
        fun startBundle(context: StartBundleContext) {
            val options =
context.pipelineOptions.`as`(YourPipelineOptions::class.java)
            producer = getKafkaProducer(options)
        }

        @ProcessElement
        fun processElement(context: ProcessContext){
            // Omitted for brevity

            // Produce the record to a specific topic at a specific partition
            producer.send(ProducerRecord(
                "your_topic_here",
                your_partition_here,
                context.element().kv.key,
                context.element().kv.value
            ))
        }

The *initial* pull request that I threw in here simply replaces the
existing null with the record.partition() (i.e. the record that was
explicitly defined initially, but it may require some other changes which
I'd need someone more familiar with the KafkaIO source to chime in on.

Re: Pull Request Review / Feature Feedback for KafkaIO

Posted by Alexey Romanenko <ar...@gmail.com>.
Thank you Rion for working on this, I think it was missed for some unknown (I don’t recall why...) reasons.

I’ll take a look on your PR.

Alexey

> On 12 Feb 2021, at 03:48, Rion Williams <ri...@gmail.com> wrote:
> 
> Hi all,
> 
> Recently, I encountered a bit of functionality for a pipeline that I was working that seemed to be slightly lacking (specifically the recognition of explicitly defined partitioning in the KafkaIO.WriteRecords transform) so I put together a JIRA related to it [1] as well as a more detailed pull request [1] with an initial potential fix/change.
> 
> I'll provide a bit more context from the pull request description below in case in-thread feedback would be easier for some, but any recommendations/reviewers/advice would be greatly appreciated!
> 
> Cheers,
> 
> Rion
> 
> [1]: https://issues.apache.org/jira/browse/BEAM-11806 <https://issues.apache.org/jira/browse/BEAM-11806>
> [2]: https://github.com/apache/beam/pull/13975 <https://github.com/apache/beam/pull/13975>
> 
> ----------------------------------
> 
> At present, the WriteRecords transform for KafkaIO does not recognize the partition property defined on ProducerRecord instances consumed by the transform:
> 
> producer.send(
>         // The null property in the following constructor represents partition
>         new ProducerRecord<>(
>             topicName, null, timestampMillis, record.key(), record.value(), record.headers()),
>         new SendCallback());
> Because of this limitation, in a scenario where a user may desire an explicitly defined partitioning strategy as opposed to round-robin, they would have to create their own custom DoFn that defines a KafkaProducer (preferably within a @StartBundle) similar to the following approach (in Kotlin):
> 
> private class ExampleProducerDoFn(...): DoFn<...>() {
>         private lateinit var producer: KafkaProducer<...>
> 
>         @StartBundle
>         fun startBundle(context: StartBundleContext) {
>             val options = context.pipelineOptions.`as`(YourPipelineOptions::class.java)
>             producer = getKafkaProducer(options)
>         }
> 
>         @ProcessElement
>         fun processElement(context: ProcessContext){
>             // Omitted for brevity
>             
>             // Produce the record to a specific topic at a specific partition
>             producer.send(ProducerRecord(
>                 "your_topic_here",
>                 your_partition_here,
>                 context.element().kv.key,
>                 context.element().kv.value
>             ))
>         }
> The initial pull request that I threw in here simply replaces the existing null with the record.partition() (i.e. the record that was explicitly defined initially, but it may require some other changes which I'd need someone more familiar with the KafkaIO source to chime in on.
> 
>