You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Rion Williams <ri...@gmail.com> on 2021/02/08 22:09:58 UTC

Unit Testing Kafka in Apache Beam

Hey folks,

I’ve been working on fleshing out a proof-of-concept pipeline that deals with some out of order data (I.e. mismatching processing times / event-times) and does quite a bit of windowing depending on the data. Most of the work I‘ve done in a lot of streaming systems relies heavily on well-written tests, both unit and integration. Beam’s existing constructs like TestStreams are incredible, however I’ve been trying to more closely mimic a production pipeline. 

I’ve been using kafka-junit [1] and it’s worked incredibly well and can be consumed by the KafkaIO source as expected. This works great, however it requires that the data is injected into the topic prior to starting the pipeline:

```
injectIntoTopic(...)

pipeline
    .apply(
        KafkaIO.read(...)
    )

pipeline.run()
```

This results in all of the data being consumed at once and thus not treating allowed lateness like I would imagine. So my question is this:

Is it possible to create a unit test to send records to a Kafka topic with some given (even artificial) delay so that I could verify things like allowed lateness within a pipeline?

Essentially like using a TestStream with all of its notions of “advanceProcessingTime” in conjunction with inserting those delayed records into Kafka.

Does that make sense? Or should I just rely on the use of a TestStream and circumvent Kafka entirely? It doesn’t seem like the right thing to do, but I don’t currently see a way to work around this (I’ve tried defining timestamps at the ProducerRecord level, adding sleeps between records, etc.)

Any advice would be appreciated!

[1] : https://github.com/salesforce/kafka-junit

Re: Unit Testing Kafka in Apache Beam

Posted by Brian Hulette <bh...@google.com>.
Ah this is getting trickier :)

So in the Cloud PubSub tests that I'm referring to we don't use PAssert, we
instead just inject some messages to one PubSub topic, and write some
output to another PubSub topic that we can then verify. We're also not
doing any verification of event time semantics.
This approach might work for you, but you won't have any direct control
over processing time which I'd imagine could lead to flakiness.

I think the right thing to do here would be to verify the event time
semantics/allowed lateness using TestStream to circumvent the KafkaIO. If
you want an end-to-end test including the KafkaIO, you might just add
another test that verifies the full pipeline with a simpler input.

Brian

On Tue, Feb 9, 2021 at 9:15 AM Rion Williams <ri...@gmail.com> wrote:

> Hey Brian,
>
> So that’s an approach that was thinking about last night and I’m sure it
> could work (I.e. running the pipeline and shortly after sending through
> messages). I’m assuming that as long as the pipeline is running, I should
> be able to apply a PAssert against it to verify that something happened
> after injecting the data and passing/failing the test.
>
> Another question that seems to be baffling me is - is it possible to
> simulate processing time advances using Kafka in a scenario like this? I
> know that I can use a TestStream with its “advanceProcessingTime()”
> constructs to simulate this in a stream, but doing so in a test scenario
> would circumvent Kafka entirely which seems to defeat the purpose of the
> test.
>
> Ideally what I’d want to accomplish is:
> - define a series of records with appropriate timestamps to send through
> my pipeline
> - window against those, verifying allowed lateness is applied/messages
> dropped accordingly
> - assert against the output of the windows
>
> Thanks so much for the response, I do appreciate it.
>
> On Feb 9, 2021, at 11:08 AM, Brian Hulette <bh...@google.com> wrote:
>
> 
> Hi Rion,
>
> Can you run the pipeline asynchronously and inject messages after it has
> started? We use this approach for some tests against Cloud PubSub.
> Note if using the DirectRunner you need to set the blockOnRun pipeline
> option to False to do this.
>
> Brian
>
> On Mon, Feb 8, 2021 at 2:10 PM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hey folks,
>>
>> I’ve been working on fleshing out a proof-of-concept pipeline that deals
>> with some out of order data (I.e. mismatching processing times /
>> event-times) and does quite a bit of windowing depending on the data. Most
>> of the work I‘ve done in a lot of streaming systems relies heavily on
>> well-written tests, both unit and integration. Beam’s existing constructs
>> like TestStreams are incredible, however I’ve been trying to more closely
>> mimic a production pipeline.
>>
>> I’ve been using kafka-junit [1] and it’s worked incredibly well and can
>> be consumed by the KafkaIO source as expected. This works great, however it
>> requires that the data is injected into the topic prior to starting the
>> pipeline:
>>
>> ```
>> injectIntoTopic(...)
>>
>> pipeline
>>     .apply(
>>         KafkaIO.read(...)
>>     )
>>
>> pipeline.run()
>> ```
>>
>> This results in all of the data being consumed at once and thus not
>> treating allowed lateness like I would imagine. So my question is this:
>>
>> Is it possible to create a unit test to send records to a Kafka topic
>> with some given (even artificial) delay so that I could verify things like
>> allowed lateness within a pipeline?
>>
>> Essentially like using a TestStream with all of its notions of
>> “advanceProcessingTime” in conjunction with inserting those delayed records
>> into Kafka.
>>
>> Does that make sense? Or should I just rely on the use of a TestStream
>> and circumvent Kafka entirely? It doesn’t seem like the right thing to do,
>> but I don’t currently see a way to work around this (I’ve tried defining
>> timestamps at the ProducerRecord level, adding sleeps between records, etc.)
>>
>> Any advice would be appreciated!
>>
>> [1] : https://github.com/salesforce/kafka-junit
>>
>

Re: Unit Testing Kafka in Apache Beam

Posted by Brian Hulette <bh...@google.com>.
Ah this is getting trickier :)

So in the Cloud PubSub tests that I'm referring to we don't use PAssert, we
instead just inject some messages to one PubSub topic, and write some
output to another PubSub topic that we can then verify. We're also not
doing any verification of event time semantics.
This approach might work for you, but you won't have any direct control
over processing time which I'd imagine could lead to flakiness.

I think the right thing to do here would be to verify the event time
semantics/allowed lateness using TestStream to circumvent the KafkaIO. If
you want an end-to-end test including the KafkaIO, you might just add
another test that verifies the full pipeline with a simpler input.

Brian

On Tue, Feb 9, 2021 at 9:15 AM Rion Williams <ri...@gmail.com> wrote:

> Hey Brian,
>
> So that’s an approach that was thinking about last night and I’m sure it
> could work (I.e. running the pipeline and shortly after sending through
> messages). I’m assuming that as long as the pipeline is running, I should
> be able to apply a PAssert against it to verify that something happened
> after injecting the data and passing/failing the test.
>
> Another question that seems to be baffling me is - is it possible to
> simulate processing time advances using Kafka in a scenario like this? I
> know that I can use a TestStream with its “advanceProcessingTime()”
> constructs to simulate this in a stream, but doing so in a test scenario
> would circumvent Kafka entirely which seems to defeat the purpose of the
> test.
>
> Ideally what I’d want to accomplish is:
> - define a series of records with appropriate timestamps to send through
> my pipeline
> - window against those, verifying allowed lateness is applied/messages
> dropped accordingly
> - assert against the output of the windows
>
> Thanks so much for the response, I do appreciate it.
>
> On Feb 9, 2021, at 11:08 AM, Brian Hulette <bh...@google.com> wrote:
>
> 
> Hi Rion,
>
> Can you run the pipeline asynchronously and inject messages after it has
> started? We use this approach for some tests against Cloud PubSub.
> Note if using the DirectRunner you need to set the blockOnRun pipeline
> option to False to do this.
>
> Brian
>
> On Mon, Feb 8, 2021 at 2:10 PM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hey folks,
>>
>> I’ve been working on fleshing out a proof-of-concept pipeline that deals
>> with some out of order data (I.e. mismatching processing times /
>> event-times) and does quite a bit of windowing depending on the data. Most
>> of the work I‘ve done in a lot of streaming systems relies heavily on
>> well-written tests, both unit and integration. Beam’s existing constructs
>> like TestStreams are incredible, however I’ve been trying to more closely
>> mimic a production pipeline.
>>
>> I’ve been using kafka-junit [1] and it’s worked incredibly well and can
>> be consumed by the KafkaIO source as expected. This works great, however it
>> requires that the data is injected into the topic prior to starting the
>> pipeline:
>>
>> ```
>> injectIntoTopic(...)
>>
>> pipeline
>>     .apply(
>>         KafkaIO.read(...)
>>     )
>>
>> pipeline.run()
>> ```
>>
>> This results in all of the data being consumed at once and thus not
>> treating allowed lateness like I would imagine. So my question is this:
>>
>> Is it possible to create a unit test to send records to a Kafka topic
>> with some given (even artificial) delay so that I could verify things like
>> allowed lateness within a pipeline?
>>
>> Essentially like using a TestStream with all of its notions of
>> “advanceProcessingTime” in conjunction with inserting those delayed records
>> into Kafka.
>>
>> Does that make sense? Or should I just rely on the use of a TestStream
>> and circumvent Kafka entirely? It doesn’t seem like the right thing to do,
>> but I don’t currently see a way to work around this (I’ve tried defining
>> timestamps at the ProducerRecord level, adding sleeps between records, etc.)
>>
>> Any advice would be appreciated!
>>
>> [1] : https://github.com/salesforce/kafka-junit
>>
>

Re: Unit Testing Kafka in Apache Beam

Posted by Rion Williams <ri...@gmail.com>.
Hey Brian,

So that’s an approach that was thinking about last night and I’m sure it could work (I.e. running the pipeline and shortly after sending through messages). I’m assuming that as long as the pipeline is running, I should be able to apply a PAssert against it to verify that something happened after injecting the data and passing/failing the test.

Another question that seems to be baffling me is - is it possible to simulate processing time advances using Kafka in a scenario like this? I know that I can use a TestStream with its “advanceProcessingTime()” constructs to simulate this in a stream, but doing so in a test scenario would circumvent Kafka entirely which seems to defeat the purpose of the test.

Ideally what I’d want to accomplish is:
- define a series of records with appropriate timestamps to send through my pipeline
- window against those, verifying allowed lateness is applied/messages dropped accordingly
- assert against the output of the windows

Thanks so much for the response, I do appreciate it.

> On Feb 9, 2021, at 11:08 AM, Brian Hulette <bh...@google.com> wrote:
> 
> 
> Hi Rion,
> 
> Can you run the pipeline asynchronously and inject messages after it has started? We use this approach for some tests against Cloud PubSub. 
> Note if using the DirectRunner you need to set the blockOnRun pipeline option to False to do this.
> 
> Brian
> 
>> On Mon, Feb 8, 2021 at 2:10 PM Rion Williams <ri...@gmail.com> wrote:
>> Hey folks,
>> 
>> I’ve been working on fleshing out a proof-of-concept pipeline that deals with some out of order data (I.e. mismatching processing times / event-times) and does quite a bit of windowing depending on the data. Most of the work I‘ve done in a lot of streaming systems relies heavily on well-written tests, both unit and integration. Beam’s existing constructs like TestStreams are incredible, however I’ve been trying to more closely mimic a production pipeline. 
>> 
>> I’ve been using kafka-junit [1] and it’s worked incredibly well and can be consumed by the KafkaIO source as expected. This works great, however it requires that the data is injected into the topic prior to starting the pipeline:
>> 
>> ```
>> injectIntoTopic(...)
>> 
>> pipeline
>>     .apply(
>>         KafkaIO.read(...)
>>     )
>> 
>> pipeline.run()
>> ```
>> 
>> This results in all of the data being consumed at once and thus not treating allowed lateness like I would imagine. So my question is this:
>> 
>> Is it possible to create a unit test to send records to a Kafka topic with some given (even artificial) delay so that I could verify things like allowed lateness within a pipeline?
>> 
>> Essentially like using a TestStream with all of its notions of “advanceProcessingTime” in conjunction with inserting those delayed records into Kafka.
>> 
>> Does that make sense? Or should I just rely on the use of a TestStream and circumvent Kafka entirely? It doesn’t seem like the right thing to do, but I don’t currently see a way to work around this (I’ve tried defining timestamps at the ProducerRecord level, adding sleeps between records, etc.)
>> 
>> Any advice would be appreciated!
>> 
>> [1] : https://github.com/salesforce/kafka-junit

Re: Unit Testing Kafka in Apache Beam

Posted by Rion Williams <ri...@gmail.com>.
Hey Brian,

So that’s an approach that was thinking about last night and I’m sure it could work (I.e. running the pipeline and shortly after sending through messages). I’m assuming that as long as the pipeline is running, I should be able to apply a PAssert against it to verify that something happened after injecting the data and passing/failing the test.

Another question that seems to be baffling me is - is it possible to simulate processing time advances using Kafka in a scenario like this? I know that I can use a TestStream with its “advanceProcessingTime()” constructs to simulate this in a stream, but doing so in a test scenario would circumvent Kafka entirely which seems to defeat the purpose of the test.

Ideally what I’d want to accomplish is:
- define a series of records with appropriate timestamps to send through my pipeline
- window against those, verifying allowed lateness is applied/messages dropped accordingly
- assert against the output of the windows

Thanks so much for the response, I do appreciate it.

> On Feb 9, 2021, at 11:08 AM, Brian Hulette <bh...@google.com> wrote:
> 
> 
> Hi Rion,
> 
> Can you run the pipeline asynchronously and inject messages after it has started? We use this approach for some tests against Cloud PubSub. 
> Note if using the DirectRunner you need to set the blockOnRun pipeline option to False to do this.
> 
> Brian
> 
>> On Mon, Feb 8, 2021 at 2:10 PM Rion Williams <ri...@gmail.com> wrote:
>> Hey folks,
>> 
>> I’ve been working on fleshing out a proof-of-concept pipeline that deals with some out of order data (I.e. mismatching processing times / event-times) and does quite a bit of windowing depending on the data. Most of the work I‘ve done in a lot of streaming systems relies heavily on well-written tests, both unit and integration. Beam’s existing constructs like TestStreams are incredible, however I’ve been trying to more closely mimic a production pipeline. 
>> 
>> I’ve been using kafka-junit [1] and it’s worked incredibly well and can be consumed by the KafkaIO source as expected. This works great, however it requires that the data is injected into the topic prior to starting the pipeline:
>> 
>> ```
>> injectIntoTopic(...)
>> 
>> pipeline
>>     .apply(
>>         KafkaIO.read(...)
>>     )
>> 
>> pipeline.run()
>> ```
>> 
>> This results in all of the data being consumed at once and thus not treating allowed lateness like I would imagine. So my question is this:
>> 
>> Is it possible to create a unit test to send records to a Kafka topic with some given (even artificial) delay so that I could verify things like allowed lateness within a pipeline?
>> 
>> Essentially like using a TestStream with all of its notions of “advanceProcessingTime” in conjunction with inserting those delayed records into Kafka.
>> 
>> Does that make sense? Or should I just rely on the use of a TestStream and circumvent Kafka entirely? It doesn’t seem like the right thing to do, but I don’t currently see a way to work around this (I’ve tried defining timestamps at the ProducerRecord level, adding sleeps between records, etc.)
>> 
>> Any advice would be appreciated!
>> 
>> [1] : https://github.com/salesforce/kafka-junit

Re: Unit Testing Kafka in Apache Beam

Posted by Brian Hulette <bh...@google.com>.
Hi Rion,

Can you run the pipeline asynchronously and inject messages after it has
started? We use this approach for some tests against Cloud PubSub.
Note if using the DirectRunner you need to set the blockOnRun pipeline
option to False to do this.

Brian

On Mon, Feb 8, 2021 at 2:10 PM Rion Williams <ri...@gmail.com> wrote:

> Hey folks,
>
> I’ve been working on fleshing out a proof-of-concept pipeline that deals
> with some out of order data (I.e. mismatching processing times /
> event-times) and does quite a bit of windowing depending on the data. Most
> of the work I‘ve done in a lot of streaming systems relies heavily on
> well-written tests, both unit and integration. Beam’s existing constructs
> like TestStreams are incredible, however I’ve been trying to more closely
> mimic a production pipeline.
>
> I’ve been using kafka-junit [1] and it’s worked incredibly well and can be
> consumed by the KafkaIO source as expected. This works great, however it
> requires that the data is injected into the topic prior to starting the
> pipeline:
>
> ```
> injectIntoTopic(...)
>
> pipeline
>     .apply(
>         KafkaIO.read(...)
>     )
>
> pipeline.run()
> ```
>
> This results in all of the data being consumed at once and thus not
> treating allowed lateness like I would imagine. So my question is this:
>
> Is it possible to create a unit test to send records to a Kafka topic with
> some given (even artificial) delay so that I could verify things like
> allowed lateness within a pipeline?
>
> Essentially like using a TestStream with all of its notions of
> “advanceProcessingTime” in conjunction with inserting those delayed records
> into Kafka.
>
> Does that make sense? Or should I just rely on the use of a TestStream and
> circumvent Kafka entirely? It doesn’t seem like the right thing to do, but
> I don’t currently see a way to work around this (I’ve tried defining
> timestamps at the ProducerRecord level, adding sleeps between records, etc.)
>
> Any advice would be appreciated!
>
> [1] : https://github.com/salesforce/kafka-junit
>

Re: Unit Testing Kafka in Apache Beam

Posted by Brian Hulette <bh...@google.com>.
Hi Rion,

Can you run the pipeline asynchronously and inject messages after it has
started? We use this approach for some tests against Cloud PubSub.
Note if using the DirectRunner you need to set the blockOnRun pipeline
option to False to do this.

Brian

On Mon, Feb 8, 2021 at 2:10 PM Rion Williams <ri...@gmail.com> wrote:

> Hey folks,
>
> I’ve been working on fleshing out a proof-of-concept pipeline that deals
> with some out of order data (I.e. mismatching processing times /
> event-times) and does quite a bit of windowing depending on the data. Most
> of the work I‘ve done in a lot of streaming systems relies heavily on
> well-written tests, both unit and integration. Beam’s existing constructs
> like TestStreams are incredible, however I’ve been trying to more closely
> mimic a production pipeline.
>
> I’ve been using kafka-junit [1] and it’s worked incredibly well and can be
> consumed by the KafkaIO source as expected. This works great, however it
> requires that the data is injected into the topic prior to starting the
> pipeline:
>
> ```
> injectIntoTopic(...)
>
> pipeline
>     .apply(
>         KafkaIO.read(...)
>     )
>
> pipeline.run()
> ```
>
> This results in all of the data being consumed at once and thus not
> treating allowed lateness like I would imagine. So my question is this:
>
> Is it possible to create a unit test to send records to a Kafka topic with
> some given (even artificial) delay so that I could verify things like
> allowed lateness within a pipeline?
>
> Essentially like using a TestStream with all of its notions of
> “advanceProcessingTime” in conjunction with inserting those delayed records
> into Kafka.
>
> Does that make sense? Or should I just rely on the use of a TestStream and
> circumvent Kafka entirely? It doesn’t seem like the right thing to do, but
> I don’t currently see a way to work around this (I’ve tried defining
> timestamps at the ProducerRecord level, adding sleeps between records, etc.)
>
> Any advice would be appreciated!
>
> [1] : https://github.com/salesforce/kafka-junit
>