You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Rion Williams <ri...@gmail.com> on 2021/02/01 16:06:05 UTC

Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Hey all,

I'm currently in a situation where I have a single Kafka topic with data
across multiple partitions and covers data from multiple sources. I'm
trying to see if there's a way that I'd be able to accomplish reading from
these different sources as different pipelines and if a Splittable DoFn can
do this.

Basically - what I'd like to do is for a given key on a record, treat this
as a separate pipeline from Kafka:

testPipeline
    .apply(
        /*
            Apply some function here to tell Kafka how to describe how
to split up
            the sources that I want to read from
         */
    )
    .apply("Ready from Kafka", KafkaIO.read(...))
    .apply("Remaining Pipeline Omitted for Brevity"

Is it possible to do this? I'm trying to avoid a major architectural change
that would require multiple separate topics by source, however if I can
guarantee that a given key (and it's associated watermark) are treated
separately, that would be ideal.

Any advice or recommendations for a strategy that might work would be
helpful!

Thanks,

Rion

Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Posted by Boyuan Zhang <bo...@google.com>.
Hi Rion,

Thanks for the explanation. I can see the case now. To my knowledge,
Splittable DoFn cannot help on this case and if you want watermark from
sources to be separated, I believe you have to have them in deperated
pipelines. I don't think we support per-key watermark like ifeature n one
pipeline.

On Mon, Feb 1, 2021 at 12:26 PM Rion Williams <ri...@gmail.com> wrote:

> Hi again Boyuan,
>
> Close, I believe. I'll describe the scenario a bit more specifically.
> Basically, I have a Kafka topic with 10 partitions and each of these
> contains records for various combinations of tenants and sources that come
> in interspersed across these partitions. This pipeline applies some
> windowing downstream, however I think for that to work properly the
> pipelines would need to be segregated in some fashion so data coming in for
> one tenant or source doesn't interfere with windowing for another.
>
> The pipeline itself looks like this:
>
> val pipeline = Pipeline.create(options)
>
> // Partition Events according to their data sources
> val partitionedEvents = pipeline
>     .apply("Read Events from Kafka",
>         KafkaIO
>             .read<String, Log>()
>             .withBootstrapServers(options.brokerUrl)
>             .withTopic(options.logsTopic)
>             .withKeyDeserializer(StringDeserializer::class.java)
>             .withValueDeserializerAndCoder(
>                 SpecificAvroDeserializer<Log>()::class.java,
>                 AvroCoder.of(Log::class.java)
>             )
>             .withReadCommitted()
>             .commitOffsetsInFinalize()
>             .withTimestampPolicyFactory { _, previousWatermark -> WatermarkPolicy(previousWatermark) }
>             .withConsumerConfigUpdates(
>             ImmutableMap.of<String, Any>(
>                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
>                 ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
>                 "schema.registry.url", options.schemaRegistryUrl
>             )
>         ).withoutMetadata()
>     )
>     .apply("Log Events", ParDo.of(Logs.log()))
>     .apply("Rekey Logs by Tenant", ParDo.of(Logs.key()))
>     .apply("Partition Logs by Source", Partition.of(sources.size, Events.partition<KV<String, Log>>(sources)))
>
> dataSources.forEach { dataSource ->
>     // Store a reference to the data source name to avoid serialization issues
>     val sourceName = dataSource.name
>
>     // Apply source-specific windowing strategies
>     partitionedLogs[dataSource.partition]
>         .apply("Building Windows for $sourceName", SourceSpecificWindow.of<KV<String, Log>>(dataSource))
>         .apply("Group Windowed Logs by Key for $sourceName", GroupByKey.create())
>         .apply("Log After Windowing for $sourceName", ParDo.of(Logs.logAfterWindowing()))
>         .apply(
>             "Writing Windowed Logs to Files for $sourceName",
>             FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
>                 .withNumShards(1)
>                 .by { row -> "${row.key}/${sourceName}" }
>                 .withDestinationCoder(StringUtf8Coder.of())
>                 .via(Contextful.fn(SerializableFunction { logs -> Files.stringify(logs.value) }), TextIO.sink())
>                 .to(options.output)
>                 .withNaming { partition -> Files.name(partition)}
>         )
> }
>
> pipeline.run().waitUntilFinish()
>
> Sorry - I know that's a lot, but in a nutshell I'm attempting to:
>
>    - Read from a multi-tenant/source topic (10 partitions)
>    - Partition those events by source
>    - Window events according to their defined source (according to
>    event-time fields within the records)
>    - Write out files on windows closing to the appropriate tenant/source
>    directory
>
> At present, it seems that because the WatermarkPolicy is only capable of
> keeping a separate watermark per partition and since it is using an
> event-time property to handle that, that multiple tenants/source
> combinations could impact others, cause windows to close unexpected/early,
> data to be missed, etc. This is why I believe that perhaps a SDF that was
> evaluated prior to reading from Kafka could allow me to treat each of these
> tenant-source pairs as separate pipelines without a major architectural
> overhaul.
>
> Is this something that an SDF might excel at or is there some other
> mechanism that I might consider to accomplish this?
>
>
>
> On Mon, Feb 1, 2021 at 1:09 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Hi Rion,
>>
>> Let's say that you have topic with 3 partitions and what you want to do
>> is to read from these 3 partitions and each partition maintains its own
>> watermark instead of having a watermark over these 3 partitions. Do I
>> understand this correctly?
>>
>>  If so, I think you need separated pipelines. If you only want to know
>> which records come from which partitions, ReadFromKafkaDoFn emits a KV pair
>> where the KafkaSourceDescriptor is the key and KafkaRecord is the value.
>>
>> On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Hi Boyuan,
>>>
>>> Do you know if it’s possible to do something similar to this with a
>>> single topic, essentially treat records with the same keys as their own
>>> distinct pipelines. The challenge I’m encountering for splitting things
>>> downstream ends up being related to watermarking at the partition-level
>>> (via a WatermarkPolicy) and I essentially need to track watermarking or
>>> treat records with a particular key the same/independently.
>>>
>>> I’d assumed that would need to be done prior to reading from Kafka,
>>> which is where the SDF would come in.
>>>
>>> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <bo...@google.com> wrote:
>>>
>>> 
>>> Hi Rion,
>>>
>>> It sounds like ReadFromKafkaDoFn
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
>>> could be one of the solutions. It takes KafkaSourceDescritpor
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
>>> it's a topic + partition) as input and emit KafkaRecords. Then your
>>> pipeline can look like:
>>> testPipeline
>>>   .apply(your source that generates KafkaSourceDescriptor)
>>>   .apply(ParDo.of(ReadFromKafkaDoFn))
>>>   .apply(other parts)
>>>
>>> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <ri...@gmail.com>
>>> wrote:
>>>
>>>> Hey all,
>>>>
>>>> I'm currently in a situation where I have a single Kafka topic with
>>>> data across multiple partitions and covers data from multiple sources. I'm
>>>> trying to see if there's a way that I'd be able to accomplish reading from
>>>> these different sources as different pipelines and if a Splittable DoFn can
>>>> do this.
>>>>
>>>> Basically - what I'd like to do is for a given key on a record, treat
>>>> this as a separate pipeline from Kafka:
>>>>
>>>> testPipeline
>>>>     .apply(
>>>>         /*
>>>>             Apply some function here to tell Kafka how to describe how to split up
>>>>             the sources that I want to read from
>>>>          */
>>>>     )
>>>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>>>     .apply("Remaining Pipeline Omitted for Brevity"
>>>>
>>>> Is it possible to do this? I'm trying to avoid a major architectural
>>>> change that would require multiple separate topics by source, however if I
>>>> can guarantee that a given key (and it's associated watermark) are treated
>>>> separately, that would be ideal.
>>>>
>>>> Any advice or recommendations for a strategy that might work would be
>>>> helpful!
>>>>
>>>> Thanks,
>>>>
>>>> Rion
>>>>
>>>

Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Posted by Boyuan Zhang <bo...@google.com>.
Hi Rion,

Thanks for the explanation. I can see the case now. To my knowledge,
Splittable DoFn cannot help on this case and if you want watermark from
sources to be separated, I believe you have to have them in deperated
pipelines. I don't think we support per-key watermark like ifeature n one
pipeline.

On Mon, Feb 1, 2021 at 12:26 PM Rion Williams <ri...@gmail.com> wrote:

> Hi again Boyuan,
>
> Close, I believe. I'll describe the scenario a bit more specifically.
> Basically, I have a Kafka topic with 10 partitions and each of these
> contains records for various combinations of tenants and sources that come
> in interspersed across these partitions. This pipeline applies some
> windowing downstream, however I think for that to work properly the
> pipelines would need to be segregated in some fashion so data coming in for
> one tenant or source doesn't interfere with windowing for another.
>
> The pipeline itself looks like this:
>
> val pipeline = Pipeline.create(options)
>
> // Partition Events according to their data sources
> val partitionedEvents = pipeline
>     .apply("Read Events from Kafka",
>         KafkaIO
>             .read<String, Log>()
>             .withBootstrapServers(options.brokerUrl)
>             .withTopic(options.logsTopic)
>             .withKeyDeserializer(StringDeserializer::class.java)
>             .withValueDeserializerAndCoder(
>                 SpecificAvroDeserializer<Log>()::class.java,
>                 AvroCoder.of(Log::class.java)
>             )
>             .withReadCommitted()
>             .commitOffsetsInFinalize()
>             .withTimestampPolicyFactory { _, previousWatermark -> WatermarkPolicy(previousWatermark) }
>             .withConsumerConfigUpdates(
>             ImmutableMap.of<String, Any>(
>                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
>                 ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
>                 "schema.registry.url", options.schemaRegistryUrl
>             )
>         ).withoutMetadata()
>     )
>     .apply("Log Events", ParDo.of(Logs.log()))
>     .apply("Rekey Logs by Tenant", ParDo.of(Logs.key()))
>     .apply("Partition Logs by Source", Partition.of(sources.size, Events.partition<KV<String, Log>>(sources)))
>
> dataSources.forEach { dataSource ->
>     // Store a reference to the data source name to avoid serialization issues
>     val sourceName = dataSource.name
>
>     // Apply source-specific windowing strategies
>     partitionedLogs[dataSource.partition]
>         .apply("Building Windows for $sourceName", SourceSpecificWindow.of<KV<String, Log>>(dataSource))
>         .apply("Group Windowed Logs by Key for $sourceName", GroupByKey.create())
>         .apply("Log After Windowing for $sourceName", ParDo.of(Logs.logAfterWindowing()))
>         .apply(
>             "Writing Windowed Logs to Files for $sourceName",
>             FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
>                 .withNumShards(1)
>                 .by { row -> "${row.key}/${sourceName}" }
>                 .withDestinationCoder(StringUtf8Coder.of())
>                 .via(Contextful.fn(SerializableFunction { logs -> Files.stringify(logs.value) }), TextIO.sink())
>                 .to(options.output)
>                 .withNaming { partition -> Files.name(partition)}
>         )
> }
>
> pipeline.run().waitUntilFinish()
>
> Sorry - I know that's a lot, but in a nutshell I'm attempting to:
>
>    - Read from a multi-tenant/source topic (10 partitions)
>    - Partition those events by source
>    - Window events according to their defined source (according to
>    event-time fields within the records)
>    - Write out files on windows closing to the appropriate tenant/source
>    directory
>
> At present, it seems that because the WatermarkPolicy is only capable of
> keeping a separate watermark per partition and since it is using an
> event-time property to handle that, that multiple tenants/source
> combinations could impact others, cause windows to close unexpected/early,
> data to be missed, etc. This is why I believe that perhaps a SDF that was
> evaluated prior to reading from Kafka could allow me to treat each of these
> tenant-source pairs as separate pipelines without a major architectural
> overhaul.
>
> Is this something that an SDF might excel at or is there some other
> mechanism that I might consider to accomplish this?
>
>
>
> On Mon, Feb 1, 2021 at 1:09 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Hi Rion,
>>
>> Let's say that you have topic with 3 partitions and what you want to do
>> is to read from these 3 partitions and each partition maintains its own
>> watermark instead of having a watermark over these 3 partitions. Do I
>> understand this correctly?
>>
>>  If so, I think you need separated pipelines. If you only want to know
>> which records come from which partitions, ReadFromKafkaDoFn emits a KV pair
>> where the KafkaSourceDescriptor is the key and KafkaRecord is the value.
>>
>> On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Hi Boyuan,
>>>
>>> Do you know if it’s possible to do something similar to this with a
>>> single topic, essentially treat records with the same keys as their own
>>> distinct pipelines. The challenge I’m encountering for splitting things
>>> downstream ends up being related to watermarking at the partition-level
>>> (via a WatermarkPolicy) and I essentially need to track watermarking or
>>> treat records with a particular key the same/independently.
>>>
>>> I’d assumed that would need to be done prior to reading from Kafka,
>>> which is where the SDF would come in.
>>>
>>> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <bo...@google.com> wrote:
>>>
>>> 
>>> Hi Rion,
>>>
>>> It sounds like ReadFromKafkaDoFn
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
>>> could be one of the solutions. It takes KafkaSourceDescritpor
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
>>> it's a topic + partition) as input and emit KafkaRecords. Then your
>>> pipeline can look like:
>>> testPipeline
>>>   .apply(your source that generates KafkaSourceDescriptor)
>>>   .apply(ParDo.of(ReadFromKafkaDoFn))
>>>   .apply(other parts)
>>>
>>> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <ri...@gmail.com>
>>> wrote:
>>>
>>>> Hey all,
>>>>
>>>> I'm currently in a situation where I have a single Kafka topic with
>>>> data across multiple partitions and covers data from multiple sources. I'm
>>>> trying to see if there's a way that I'd be able to accomplish reading from
>>>> these different sources as different pipelines and if a Splittable DoFn can
>>>> do this.
>>>>
>>>> Basically - what I'd like to do is for a given key on a record, treat
>>>> this as a separate pipeline from Kafka:
>>>>
>>>> testPipeline
>>>>     .apply(
>>>>         /*
>>>>             Apply some function here to tell Kafka how to describe how to split up
>>>>             the sources that I want to read from
>>>>          */
>>>>     )
>>>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>>>     .apply("Remaining Pipeline Omitted for Brevity"
>>>>
>>>> Is it possible to do this? I'm trying to avoid a major architectural
>>>> change that would require multiple separate topics by source, however if I
>>>> can guarantee that a given key (and it's associated watermark) are treated
>>>> separately, that would be ideal.
>>>>
>>>> Any advice or recommendations for a strategy that might work would be
>>>> helpful!
>>>>
>>>> Thanks,
>>>>
>>>> Rion
>>>>
>>>

Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Posted by Rion Williams <ri...@gmail.com>.
Hi again Boyuan,

Close, I believe. I'll describe the scenario a bit more specifically.
Basically, I have a Kafka topic with 10 partitions and each of these
contains records for various combinations of tenants and sources that come
in interspersed across these partitions. This pipeline applies some
windowing downstream, however I think for that to work properly the
pipelines would need to be segregated in some fashion so data coming in for
one tenant or source doesn't interfere with windowing for another.

The pipeline itself looks like this:

val pipeline = Pipeline.create(options)

// Partition Events according to their data sources
val partitionedEvents = pipeline
    .apply("Read Events from Kafka",
        KafkaIO
            .read<String, Log>()
            .withBootstrapServers(options.brokerUrl)
            .withTopic(options.logsTopic)
            .withKeyDeserializer(StringDeserializer::class.java)
            .withValueDeserializerAndCoder(
                SpecificAvroDeserializer<Log>()::class.java,
                AvroCoder.of(Log::class.java)
            )
            .withReadCommitted()
            .commitOffsetsInFinalize()
            .withTimestampPolicyFactory { _, previousWatermark ->
WatermarkPolicy(previousWatermark) }
            .withConsumerConfigUpdates(
            ImmutableMap.of<String, Any>(
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
                ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
                "schema.registry.url", options.schemaRegistryUrl
            )
        ).withoutMetadata()
    )
    .apply("Log Events", ParDo.of(Logs.log()))
    .apply("Rekey Logs by Tenant", ParDo.of(Logs.key()))
    .apply("Partition Logs by Source", Partition.of(sources.size,
Events.partition<KV<String, Log>>(sources)))

dataSources.forEach { dataSource ->
    // Store a reference to the data source name to avoid serialization issues
    val sourceName = dataSource.name

    // Apply source-specific windowing strategies
    partitionedLogs[dataSource.partition]
        .apply("Building Windows for $sourceName",
SourceSpecificWindow.of<KV<String, Log>>(dataSource))
        .apply("Group Windowed Logs by Key for $sourceName",
GroupByKey.create())
        .apply("Log After Windowing for $sourceName",
ParDo.of(Logs.logAfterWindowing()))
        .apply(
            "Writing Windowed Logs to Files for $sourceName",
            FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
                .withNumShards(1)
                .by { row -> "${row.key}/${sourceName}" }
                .withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.fn(SerializableFunction { logs ->
Files.stringify(logs.value) }), TextIO.sink())
                .to(options.output)
                .withNaming { partition -> Files.name(partition)}
        )
}

pipeline.run().waitUntilFinish()

Sorry - I know that's a lot, but in a nutshell I'm attempting to:

   - Read from a multi-tenant/source topic (10 partitions)
   - Partition those events by source
   - Window events according to their defined source (according to
   event-time fields within the records)
   - Write out files on windows closing to the appropriate tenant/source
   directory

At present, it seems that because the WatermarkPolicy is only capable of
keeping a separate watermark per partition and since it is using an
event-time property to handle that, that multiple tenants/source
combinations could impact others, cause windows to close unexpected/early,
data to be missed, etc. This is why I believe that perhaps a SDF that was
evaluated prior to reading from Kafka could allow me to treat each of these
tenant-source pairs as separate pipelines without a major architectural
overhaul.

Is this something that an SDF might excel at or is there some other
mechanism that I might consider to accomplish this?



On Mon, Feb 1, 2021 at 1:09 PM Boyuan Zhang <bo...@google.com> wrote:

> Hi Rion,
>
> Let's say that you have topic with 3 partitions and what you want to do is
> to read from these 3 partitions and each partition maintains its own
> watermark instead of having a watermark over these 3 partitions. Do I
> understand this correctly?
>
>  If so, I think you need separated pipelines. If you only want to know
> which records come from which partitions, ReadFromKafkaDoFn emits a KV pair
> where the KafkaSourceDescriptor is the key and KafkaRecord is the value.
>
> On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hi Boyuan,
>>
>> Do you know if it’s possible to do something similar to this with a
>> single topic, essentially treat records with the same keys as their own
>> distinct pipelines. The challenge I’m encountering for splitting things
>> downstream ends up being related to watermarking at the partition-level
>> (via a WatermarkPolicy) and I essentially need to track watermarking or
>> treat records with a particular key the same/independently.
>>
>> I’d assumed that would need to be done prior to reading from Kafka, which
>> is where the SDF would come in.
>>
>> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <bo...@google.com> wrote:
>>
>> 
>> Hi Rion,
>>
>> It sounds like ReadFromKafkaDoFn
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
>> could be one of the solutions. It takes KafkaSourceDescritpor
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
>> it's a topic + partition) as input and emit KafkaRecords. Then your
>> pipeline can look like:
>> testPipeline
>>   .apply(your source that generates KafkaSourceDescriptor)
>>   .apply(ParDo.of(ReadFromKafkaDoFn))
>>   .apply(other parts)
>>
>> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Hey all,
>>>
>>> I'm currently in a situation where I have a single Kafka topic with data
>>> across multiple partitions and covers data from multiple sources. I'm
>>> trying to see if there's a way that I'd be able to accomplish reading from
>>> these different sources as different pipelines and if a Splittable DoFn can
>>> do this.
>>>
>>> Basically - what I'd like to do is for a given key on a record, treat
>>> this as a separate pipeline from Kafka:
>>>
>>> testPipeline
>>>     .apply(
>>>         /*
>>>             Apply some function here to tell Kafka how to describe how to split up
>>>             the sources that I want to read from
>>>          */
>>>     )
>>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>>     .apply("Remaining Pipeline Omitted for Brevity"
>>>
>>> Is it possible to do this? I'm trying to avoid a major architectural
>>> change that would require multiple separate topics by source, however if I
>>> can guarantee that a given key (and it's associated watermark) are treated
>>> separately, that would be ideal.
>>>
>>> Any advice or recommendations for a strategy that might work would be
>>> helpful!
>>>
>>> Thanks,
>>>
>>> Rion
>>>
>>

Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Posted by Rion Williams <ri...@gmail.com>.
Hi again Boyuan,

Close, I believe. I'll describe the scenario a bit more specifically.
Basically, I have a Kafka topic with 10 partitions and each of these
contains records for various combinations of tenants and sources that come
in interspersed across these partitions. This pipeline applies some
windowing downstream, however I think for that to work properly the
pipelines would need to be segregated in some fashion so data coming in for
one tenant or source doesn't interfere with windowing for another.

The pipeline itself looks like this:

val pipeline = Pipeline.create(options)

// Partition Events according to their data sources
val partitionedEvents = pipeline
    .apply("Read Events from Kafka",
        KafkaIO
            .read<String, Log>()
            .withBootstrapServers(options.brokerUrl)
            .withTopic(options.logsTopic)
            .withKeyDeserializer(StringDeserializer::class.java)
            .withValueDeserializerAndCoder(
                SpecificAvroDeserializer<Log>()::class.java,
                AvroCoder.of(Log::class.java)
            )
            .withReadCommitted()
            .commitOffsetsInFinalize()
            .withTimestampPolicyFactory { _, previousWatermark ->
WatermarkPolicy(previousWatermark) }
            .withConsumerConfigUpdates(
            ImmutableMap.of<String, Any>(
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
                ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
                "schema.registry.url", options.schemaRegistryUrl
            )
        ).withoutMetadata()
    )
    .apply("Log Events", ParDo.of(Logs.log()))
    .apply("Rekey Logs by Tenant", ParDo.of(Logs.key()))
    .apply("Partition Logs by Source", Partition.of(sources.size,
Events.partition<KV<String, Log>>(sources)))

dataSources.forEach { dataSource ->
    // Store a reference to the data source name to avoid serialization issues
    val sourceName = dataSource.name

    // Apply source-specific windowing strategies
    partitionedLogs[dataSource.partition]
        .apply("Building Windows for $sourceName",
SourceSpecificWindow.of<KV<String, Log>>(dataSource))
        .apply("Group Windowed Logs by Key for $sourceName",
GroupByKey.create())
        .apply("Log After Windowing for $sourceName",
ParDo.of(Logs.logAfterWindowing()))
        .apply(
            "Writing Windowed Logs to Files for $sourceName",
            FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
                .withNumShards(1)
                .by { row -> "${row.key}/${sourceName}" }
                .withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.fn(SerializableFunction { logs ->
Files.stringify(logs.value) }), TextIO.sink())
                .to(options.output)
                .withNaming { partition -> Files.name(partition)}
        )
}

pipeline.run().waitUntilFinish()

Sorry - I know that's a lot, but in a nutshell I'm attempting to:

   - Read from a multi-tenant/source topic (10 partitions)
   - Partition those events by source
   - Window events according to their defined source (according to
   event-time fields within the records)
   - Write out files on windows closing to the appropriate tenant/source
   directory

At present, it seems that because the WatermarkPolicy is only capable of
keeping a separate watermark per partition and since it is using an
event-time property to handle that, that multiple tenants/source
combinations could impact others, cause windows to close unexpected/early,
data to be missed, etc. This is why I believe that perhaps a SDF that was
evaluated prior to reading from Kafka could allow me to treat each of these
tenant-source pairs as separate pipelines without a major architectural
overhaul.

Is this something that an SDF might excel at or is there some other
mechanism that I might consider to accomplish this?



On Mon, Feb 1, 2021 at 1:09 PM Boyuan Zhang <bo...@google.com> wrote:

> Hi Rion,
>
> Let's say that you have topic with 3 partitions and what you want to do is
> to read from these 3 partitions and each partition maintains its own
> watermark instead of having a watermark over these 3 partitions. Do I
> understand this correctly?
>
>  If so, I think you need separated pipelines. If you only want to know
> which records come from which partitions, ReadFromKafkaDoFn emits a KV pair
> where the KafkaSourceDescriptor is the key and KafkaRecord is the value.
>
> On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hi Boyuan,
>>
>> Do you know if it’s possible to do something similar to this with a
>> single topic, essentially treat records with the same keys as their own
>> distinct pipelines. The challenge I’m encountering for splitting things
>> downstream ends up being related to watermarking at the partition-level
>> (via a WatermarkPolicy) and I essentially need to track watermarking or
>> treat records with a particular key the same/independently.
>>
>> I’d assumed that would need to be done prior to reading from Kafka, which
>> is where the SDF would come in.
>>
>> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <bo...@google.com> wrote:
>>
>> 
>> Hi Rion,
>>
>> It sounds like ReadFromKafkaDoFn
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
>> could be one of the solutions. It takes KafkaSourceDescritpor
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
>> it's a topic + partition) as input and emit KafkaRecords. Then your
>> pipeline can look like:
>> testPipeline
>>   .apply(your source that generates KafkaSourceDescriptor)
>>   .apply(ParDo.of(ReadFromKafkaDoFn))
>>   .apply(other parts)
>>
>> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Hey all,
>>>
>>> I'm currently in a situation where I have a single Kafka topic with data
>>> across multiple partitions and covers data from multiple sources. I'm
>>> trying to see if there's a way that I'd be able to accomplish reading from
>>> these different sources as different pipelines and if a Splittable DoFn can
>>> do this.
>>>
>>> Basically - what I'd like to do is for a given key on a record, treat
>>> this as a separate pipeline from Kafka:
>>>
>>> testPipeline
>>>     .apply(
>>>         /*
>>>             Apply some function here to tell Kafka how to describe how to split up
>>>             the sources that I want to read from
>>>          */
>>>     )
>>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>>     .apply("Remaining Pipeline Omitted for Brevity"
>>>
>>> Is it possible to do this? I'm trying to avoid a major architectural
>>> change that would require multiple separate topics by source, however if I
>>> can guarantee that a given key (and it's associated watermark) are treated
>>> separately, that would be ideal.
>>>
>>> Any advice or recommendations for a strategy that might work would be
>>> helpful!
>>>
>>> Thanks,
>>>
>>> Rion
>>>
>>

Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Posted by Boyuan Zhang <bo...@google.com>.
Hi Rion,

Let's say that you have topic with 3 partitions and what you want to do is
to read from these 3 partitions and each partition maintains its own
watermark instead of having a watermark over these 3 partitions. Do I
understand this correctly?

 If so, I think you need separated pipelines. If you only want to know
which records come from which partitions, ReadFromKafkaDoFn emits a KV pair
where the KafkaSourceDescriptor is the key and KafkaRecord is the value.

On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <ri...@gmail.com> wrote:

> Hi Boyuan,
>
> Do you know if it’s possible to do something similar to this with a single
> topic, essentially treat records with the same keys as their own distinct
> pipelines. The challenge I’m encountering for splitting things downstream
> ends up being related to watermarking at the partition-level (via a
> WatermarkPolicy) and I essentially need to track watermarking or treat
> records with a particular key the same/independently.
>
> I’d assumed that would need to be done prior to reading from Kafka, which
> is where the SDF would come in.
>
> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <bo...@google.com> wrote:
>
> 
> Hi Rion,
>
> It sounds like ReadFromKafkaDoFn
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
> could be one of the solutions. It takes KafkaSourceDescritpor
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
> it's a topic + partition) as input and emit KafkaRecords. Then your
> pipeline can look like:
> testPipeline
>   .apply(your source that generates KafkaSourceDescriptor)
>   .apply(ParDo.of(ReadFromKafkaDoFn))
>   .apply(other parts)
>
> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hey all,
>>
>> I'm currently in a situation where I have a single Kafka topic with data
>> across multiple partitions and covers data from multiple sources. I'm
>> trying to see if there's a way that I'd be able to accomplish reading from
>> these different sources as different pipelines and if a Splittable DoFn can
>> do this.
>>
>> Basically - what I'd like to do is for a given key on a record, treat
>> this as a separate pipeline from Kafka:
>>
>> testPipeline
>>     .apply(
>>         /*
>>             Apply some function here to tell Kafka how to describe how to split up
>>             the sources that I want to read from
>>          */
>>     )
>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>     .apply("Remaining Pipeline Omitted for Brevity"
>>
>> Is it possible to do this? I'm trying to avoid a major architectural
>> change that would require multiple separate topics by source, however if I
>> can guarantee that a given key (and it's associated watermark) are treated
>> separately, that would be ideal.
>>
>> Any advice or recommendations for a strategy that might work would be
>> helpful!
>>
>> Thanks,
>>
>> Rion
>>
>

Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Posted by Boyuan Zhang <bo...@google.com>.
Hi Rion,

Let's say that you have topic with 3 partitions and what you want to do is
to read from these 3 partitions and each partition maintains its own
watermark instead of having a watermark over these 3 partitions. Do I
understand this correctly?

 If so, I think you need separated pipelines. If you only want to know
which records come from which partitions, ReadFromKafkaDoFn emits a KV pair
where the KafkaSourceDescriptor is the key and KafkaRecord is the value.

On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <ri...@gmail.com> wrote:

> Hi Boyuan,
>
> Do you know if it’s possible to do something similar to this with a single
> topic, essentially treat records with the same keys as their own distinct
> pipelines. The challenge I’m encountering for splitting things downstream
> ends up being related to watermarking at the partition-level (via a
> WatermarkPolicy) and I essentially need to track watermarking or treat
> records with a particular key the same/independently.
>
> I’d assumed that would need to be done prior to reading from Kafka, which
> is where the SDF would come in.
>
> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <bo...@google.com> wrote:
>
> 
> Hi Rion,
>
> It sounds like ReadFromKafkaDoFn
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
> could be one of the solutions. It takes KafkaSourceDescritpor
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
> it's a topic + partition) as input and emit KafkaRecords. Then your
> pipeline can look like:
> testPipeline
>   .apply(your source that generates KafkaSourceDescriptor)
>   .apply(ParDo.of(ReadFromKafkaDoFn))
>   .apply(other parts)
>
> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hey all,
>>
>> I'm currently in a situation where I have a single Kafka topic with data
>> across multiple partitions and covers data from multiple sources. I'm
>> trying to see if there's a way that I'd be able to accomplish reading from
>> these different sources as different pipelines and if a Splittable DoFn can
>> do this.
>>
>> Basically - what I'd like to do is for a given key on a record, treat
>> this as a separate pipeline from Kafka:
>>
>> testPipeline
>>     .apply(
>>         /*
>>             Apply some function here to tell Kafka how to describe how to split up
>>             the sources that I want to read from
>>          */
>>     )
>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>     .apply("Remaining Pipeline Omitted for Brevity"
>>
>> Is it possible to do this? I'm trying to avoid a major architectural
>> change that would require multiple separate topics by source, however if I
>> can guarantee that a given key (and it's associated watermark) are treated
>> separately, that would be ideal.
>>
>> Any advice or recommendations for a strategy that might work would be
>> helpful!
>>
>> Thanks,
>>
>> Rion
>>
>

Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Posted by Rion Williams <ri...@gmail.com>.
Hi Boyuan,

Do you know if it’s possible to do something similar to this with a single topic, essentially treat records with the same keys as their own distinct pipelines. The challenge I’m encountering for splitting things downstream ends up being related to watermarking at the partition-level (via a WatermarkPolicy) and I essentially need to track watermarking or treat records with a particular key the same/independently.

I’d assumed that would need to be done prior to reading from Kafka, which is where the SDF would come in.

> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <bo...@google.com> wrote:
> 
> 
> Hi Rion,
> 
> It sounds like ReadFromKafkaDoFn could be one of the solutions. It takes KafkaSourceDescritpor(basically it's a topic + partition) as input and emit KafkaRecords. Then your pipeline can look like:
> testPipeline
>   .apply(your source that generates KafkaSourceDescriptor)
>   .apply(ParDo.of(ReadFromKafkaDoFn))
>   .apply(other parts)
> 
>> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <ri...@gmail.com> wrote:
>> Hey all,
>> 
>> I'm currently in a situation where I have a single Kafka topic with data across multiple partitions and covers data from multiple sources. I'm trying to see if there's a way that I'd be able to accomplish reading from these different sources as different pipelines and if a Splittable DoFn can do this.
>> 
>> Basically - what I'd like to do is for a given key on a record, treat this as a separate pipeline from Kafka:
>> testPipeline
>>     .apply(
>>         /*
>>             Apply some function here to tell Kafka how to describe how to split up
>>             the sources that I want to read from
>>          */
>>     )
>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>     .apply("Remaining Pipeline Omitted for Brevity"
>> Is it possible to do this? I'm trying to avoid a major architectural change that would require multiple separate topics by source, however if I can guarantee that a given key (and it's associated watermark) are treated separately, that would be ideal.
>> 
>> Any advice or recommendations for a strategy that might work would be helpful!
>> 
>> Thanks,
>> 
>> Rion

Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Posted by Rion Williams <ri...@gmail.com>.
Hi Boyuan,

Do you know if it’s possible to do something similar to this with a single topic, essentially treat records with the same keys as their own distinct pipelines. The challenge I’m encountering for splitting things downstream ends up being related to watermarking at the partition-level (via a WatermarkPolicy) and I essentially need to track watermarking or treat records with a particular key the same/independently.

I’d assumed that would need to be done prior to reading from Kafka, which is where the SDF would come in.

> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <bo...@google.com> wrote:
> 
> 
> Hi Rion,
> 
> It sounds like ReadFromKafkaDoFn could be one of the solutions. It takes KafkaSourceDescritpor(basically it's a topic + partition) as input and emit KafkaRecords. Then your pipeline can look like:
> testPipeline
>   .apply(your source that generates KafkaSourceDescriptor)
>   .apply(ParDo.of(ReadFromKafkaDoFn))
>   .apply(other parts)
> 
>> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <ri...@gmail.com> wrote:
>> Hey all,
>> 
>> I'm currently in a situation where I have a single Kafka topic with data across multiple partitions and covers data from multiple sources. I'm trying to see if there's a way that I'd be able to accomplish reading from these different sources as different pipelines and if a Splittable DoFn can do this.
>> 
>> Basically - what I'd like to do is for a given key on a record, treat this as a separate pipeline from Kafka:
>> testPipeline
>>     .apply(
>>         /*
>>             Apply some function here to tell Kafka how to describe how to split up
>>             the sources that I want to read from
>>          */
>>     )
>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>     .apply("Remaining Pipeline Omitted for Brevity"
>> Is it possible to do this? I'm trying to avoid a major architectural change that would require multiple separate topics by source, however if I can guarantee that a given key (and it's associated watermark) are treated separately, that would be ideal.
>> 
>> Any advice or recommendations for a strategy that might work would be helpful!
>> 
>> Thanks,
>> 
>> Rion

Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Posted by Boyuan Zhang <bo...@google.com>.
Hi Rion,

It sounds like ReadFromKafkaDoFn
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
could be one of the solutions. It takes KafkaSourceDescritpor
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
it's a topic + partition) as input and emit KafkaRecords. Then your
pipeline can look like:
testPipeline
  .apply(your source that generates KafkaSourceDescriptor)
  .apply(ParDo.of(ReadFromKafkaDoFn))
  .apply(other parts)

On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <ri...@gmail.com> wrote:

> Hey all,
>
> I'm currently in a situation where I have a single Kafka topic with data
> across multiple partitions and covers data from multiple sources. I'm
> trying to see if there's a way that I'd be able to accomplish reading from
> these different sources as different pipelines and if a Splittable DoFn can
> do this.
>
> Basically - what I'd like to do is for a given key on a record, treat this
> as a separate pipeline from Kafka:
>
> testPipeline
>     .apply(
>         /*
>             Apply some function here to tell Kafka how to describe how to split up
>             the sources that I want to read from
>          */
>     )
>     .apply("Ready from Kafka", KafkaIO.read(...))
>     .apply("Remaining Pipeline Omitted for Brevity"
>
> Is it possible to do this? I'm trying to avoid a major architectural
> change that would require multiple separate topics by source, however if I
> can guarantee that a given key (and it's associated watermark) are treated
> separately, that would be ideal.
>
> Any advice or recommendations for a strategy that might work would be
> helpful!
>
> Thanks,
>
> Rion
>

Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

Posted by Boyuan Zhang <bo...@google.com>.
Hi Rion,

It sounds like ReadFromKafkaDoFn
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
could be one of the solutions. It takes KafkaSourceDescritpor
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
it's a topic + partition) as input and emit KafkaRecords. Then your
pipeline can look like:
testPipeline
  .apply(your source that generates KafkaSourceDescriptor)
  .apply(ParDo.of(ReadFromKafkaDoFn))
  .apply(other parts)

On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <ri...@gmail.com> wrote:

> Hey all,
>
> I'm currently in a situation where I have a single Kafka topic with data
> across multiple partitions and covers data from multiple sources. I'm
> trying to see if there's a way that I'd be able to accomplish reading from
> these different sources as different pipelines and if a Splittable DoFn can
> do this.
>
> Basically - what I'd like to do is for a given key on a record, treat this
> as a separate pipeline from Kafka:
>
> testPipeline
>     .apply(
>         /*
>             Apply some function here to tell Kafka how to describe how to split up
>             the sources that I want to read from
>          */
>     )
>     .apply("Ready from Kafka", KafkaIO.read(...))
>     .apply("Remaining Pipeline Omitted for Brevity"
>
> Is it possible to do this? I'm trying to avoid a major architectural
> change that would require multiple separate topics by source, however if I
> can guarantee that a given key (and it's associated watermark) are treated
> separately, that would be ideal.
>
> Any advice or recommendations for a strategy that might work would be
> helpful!
>
> Thanks,
>
> Rion
>