You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Niels Basjes <Ni...@basjes.nl> on 2020/07/15 18:48:38 UTC

Terminating a streaming integration test

Hi,

I'm testing to see how I can build an integration test for a Beam
application that uses PubSub.

So I start the Google provided PubSub emulator, create things like topic
and subscription, put in some validation messages and then run the job
against that and verify the data that comes out.
I'm logging the events to the screen and there I see the data coming in and
being processed.

The problem I have is that I have not been able to figure out how to
cleanly terminate this stream after it has processed all my messages.

I have also inserted some 'stop' messages to enable triggering a "we're
done, you can stop now".

I've been digging through documentation and the apis and found nothing that
works.

This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000))

I have tried setting the timestamp of those to MAX_LONG and
TIMESTAMP_MAX_VALUE but that yielded exceptions.

So far I have not been able to figure out how to tell the TestPipeline:
Finish what you have and shutdown.

How do I do that?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Terminating a streaming integration test

Posted by Luke Cwik <lc...@google.com>.
For unit testing, I would recommend using TestStream + PAssert where
TestStream replaces the Pubsub source. You can see some examples in
LeaderBoardTest[1]

For integration testing, I would recommend using a metric and cancelling
the pipeline from another thread once the condition is met. There is some
code in the TestDataflowRunner[2] that does this (this code checks for
PAssert success/failure metrics but you can look for any metric that you
want).

1:
https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
2:
https://github.com/apache/beam/blob/de1c14777d3c6a1231361db12f3a0b9fd3b84b3e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java#L145

On Thu, Jul 16, 2020 at 11:16 AM Niels Basjes <Ni...@basjes.nl> wrote:

> What is the cleanest way to detect the pipeline can be cancelled?
> If the pipeline runs as intended I think I should stop on a "stop"
> message. And in case of problems I should stop on a timeout.
>
> Do you know of an example that does this?
>
> Niels
>
> On Thu, 16 Jul 2020, 18:34 Luke Cwik, <lc...@google.com> wrote:
>
>> Have you tried cancelling[1] the pipeline once your condition is met?
>>
>> 1:
>> https://github.com/apache/beam/blob/d957ce480604be623b0df77e62948c3e0d2c2453/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L46
>>
>> On Thu, Jul 16, 2020 at 7:22 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Hi Niels,
>>>
>>> AFAICT, for that reason in KafkaIOIT [1] we use
>>> “.withMaxNumRecords(numRecords)” where “numRecords” is actually a
>>> cardinality of input dataset. It’s not 100% faire imo, since in this case
>>> UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use
>>> “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this.
>>>
>>> Also, I believe that “p.run()” should be asynchronous by default [2] and
>>> I guess it’s blocked only for DirectRunner (perhaps in a sense of testing
>>> reasons).
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
>>> [2]
>>> https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline
>>>
>>> On 16 Jul 2020, at 15:02, Niels Basjes <Ni...@basjes.nl> wrote:
>>>
>>> Hi,
>>>
>>> I found a way that seems to work.
>>>
>>> I have
>>>
>>> @Rule
>>>
>>> public final transient TestPipeline pipeline = TestPipeline.create();
>>>
>>> in the test I configure PubSub to connect to the emulator
>>>
>>> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);
>>>
>>> options.setProject(PROJECT_NAME);
>>>
>>> options.setPubsubRootUrl(getPubsubRootUrl());
>>>
>>> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);
>>>
>>> options.setStreaming(true);
>>>
>>> I then hook my processing to this pipeline and afterwards I do this:
>>>
>>> *pipeline
>>> *        .getOptions()        .as(DirectOptions.class)        .setBlockOnRun(false);
>>>
>>> PipelineResult job = pipeline.run();
>>>
>>> long waitTime = 5000;
>>>
>>> LOG.info("Waiting ... {} seconds", waitTime/1000);
>>>
>>> job.waitUntilFinish(Duration.millis(waitTime));
>>>
>>>
>>> Although this works it will fail my build on a slow machine.
>>>
>>> Is this the best option? Or can I inject a "stop" message in my stream and let the pipeline shutdown once it sees that?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Can you please indicate if this is a valid way of doing this?
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <Ni...@basjes.nl> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm testing to see how I can build an integration test for a Beam
>>>> application that uses PubSub.
>>>>
>>>> So I start the Google provided PubSub emulator, create things like
>>>> topic and subscription, put in some validation messages and then run the
>>>> job against that and verify the data that comes out.
>>>> I'm logging the events to the screen and there I see the data coming in
>>>> and being processed.
>>>>
>>>> The problem I have is that I have not been able to figure out how to
>>>> cleanly terminate this stream after it has processed all my messages.
>>>>
>>>> I have also inserted some 'stop' messages to enable triggering a "we're
>>>> done, you can stop now".
>>>>
>>>> I've been digging through documentation and the apis and found nothing
>>>> that works.
>>>>
>>>> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000
>>>> ))
>>>>
>>>> I have tried setting the timestamp of those to MAX_LONG and
>>>> TIMESTAMP_MAX_VALUE but that yielded exceptions.
>>>>
>>>> So far I have not been able to figure out how to tell the TestPipeline:
>>>> Finish what you have and shutdown.
>>>>
>>>> How do I do that?
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>>
>>>

Re: Terminating a streaming integration test

Posted by Niels Basjes <Ni...@basjes.nl>.
What is the cleanest way to detect the pipeline can be cancelled?
If the pipeline runs as intended I think I should stop on a "stop" message.
And in case of problems I should stop on a timeout.

Do you know of an example that does this?

Niels

On Thu, 16 Jul 2020, 18:34 Luke Cwik, <lc...@google.com> wrote:

> Have you tried cancelling[1] the pipeline once your condition is met?
>
> 1:
> https://github.com/apache/beam/blob/d957ce480604be623b0df77e62948c3e0d2c2453/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L46
>
> On Thu, Jul 16, 2020 at 7:22 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Hi Niels,
>>
>> AFAICT, for that reason in KafkaIOIT [1] we use
>> “.withMaxNumRecords(numRecords)” where “numRecords” is actually a
>> cardinality of input dataset. It’s not 100% faire imo, since in this case
>> UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use
>> “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this.
>>
>> Also, I believe that “p.run()” should be asynchronous by default [2] and
>> I guess it’s blocked only for DirectRunner (perhaps in a sense of testing
>> reasons).
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
>> [2]
>> https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline
>>
>> On 16 Jul 2020, at 15:02, Niels Basjes <Ni...@basjes.nl> wrote:
>>
>> Hi,
>>
>> I found a way that seems to work.
>>
>> I have
>>
>> @Rule
>>
>> public final transient TestPipeline pipeline = TestPipeline.create();
>>
>> in the test I configure PubSub to connect to the emulator
>>
>> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);
>>
>> options.setProject(PROJECT_NAME);
>>
>> options.setPubsubRootUrl(getPubsubRootUrl());
>>
>> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);
>>
>> options.setStreaming(true);
>>
>> I then hook my processing to this pipeline and afterwards I do this:
>>
>> *pipeline
>> *        .getOptions()        .as(DirectOptions.class)        .setBlockOnRun(false);
>>
>> PipelineResult job = pipeline.run();
>>
>> long waitTime = 5000;
>>
>> LOG.info("Waiting ... {} seconds", waitTime/1000);
>>
>> job.waitUntilFinish(Duration.millis(waitTime));
>>
>>
>> Although this works it will fail my build on a slow machine.
>>
>> Is this the best option? Or can I inject a "stop" message in my stream and let the pipeline shutdown once it sees that?
>>
>>
>>
>>
>>
>>
>>
>> Can you please indicate if this is a valid way of doing this?
>> Thanks.
>>
>>
>>
>>
>>
>> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <Ni...@basjes.nl> wrote:
>>
>>> Hi,
>>>
>>> I'm testing to see how I can build an integration test for a Beam
>>> application that uses PubSub.
>>>
>>> So I start the Google provided PubSub emulator, create things like topic
>>> and subscription, put in some validation messages and then run the job
>>> against that and verify the data that comes out.
>>> I'm logging the events to the screen and there I see the data coming in
>>> and being processed.
>>>
>>> The problem I have is that I have not been able to figure out how to
>>> cleanly terminate this stream after it has processed all my messages.
>>>
>>> I have also inserted some 'stop' messages to enable triggering a "we're
>>> done, you can stop now".
>>>
>>> I've been digging through documentation and the apis and found nothing
>>> that works.
>>>
>>> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000
>>> ))
>>>
>>> I have tried setting the timestamp of those to MAX_LONG and
>>> TIMESTAMP_MAX_VALUE but that yielded exceptions.
>>>
>>> So far I have not been able to figure out how to tell the TestPipeline:
>>> Finish what you have and shutdown.
>>>
>>> How do I do that?
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>>
>>

Re: Terminating a streaming integration test

Posted by Luke Cwik <lc...@google.com>.
Have you tried cancelling[1] the pipeline once your condition is met?

1:
https://github.com/apache/beam/blob/d957ce480604be623b0df77e62948c3e0d2c2453/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L46

On Thu, Jul 16, 2020 at 7:22 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hi Niels,
>
> AFAICT, for that reason in KafkaIOIT [1] we use
> “.withMaxNumRecords(numRecords)” where “numRecords” is actually a
> cardinality of input dataset. It’s not 100% faire imo, since in this case
> UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use
> “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this.
>
> Also, I believe that “p.run()” should be asynchronous by default [2] and I
> guess it’s blocked only for DirectRunner (perhaps in a sense of testing
> reasons).
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
> [2]
> https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline
>
> On 16 Jul 2020, at 15:02, Niels Basjes <Ni...@basjes.nl> wrote:
>
> Hi,
>
> I found a way that seems to work.
>
> I have
>
> @Rule
>
> public final transient TestPipeline pipeline = TestPipeline.create();
>
> in the test I configure PubSub to connect to the emulator
>
> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);
>
> options.setProject(PROJECT_NAME);
>
> options.setPubsubRootUrl(getPubsubRootUrl());
>
> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);
>
> options.setStreaming(true);
>
> I then hook my processing to this pipeline and afterwards I do this:
>
> *pipeline
> *        .getOptions()        .as(DirectOptions.class)        .setBlockOnRun(false);
>
> PipelineResult job = pipeline.run();
>
> long waitTime = 5000;
>
> LOG.info("Waiting ... {} seconds", waitTime/1000);
>
> job.waitUntilFinish(Duration.millis(waitTime));
>
>
> Although this works it will fail my build on a slow machine.
>
> Is this the best option? Or can I inject a "stop" message in my stream and let the pipeline shutdown once it sees that?
>
>
>
>
>
>
>
> Can you please indicate if this is a valid way of doing this?
> Thanks.
>
>
>
>
>
> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <Ni...@basjes.nl> wrote:
>
>> Hi,
>>
>> I'm testing to see how I can build an integration test for a Beam
>> application that uses PubSub.
>>
>> So I start the Google provided PubSub emulator, create things like topic
>> and subscription, put in some validation messages and then run the job
>> against that and verify the data that comes out.
>> I'm logging the events to the screen and there I see the data coming in
>> and being processed.
>>
>> The problem I have is that I have not been able to figure out how to
>> cleanly terminate this stream after it has processed all my messages.
>>
>> I have also inserted some 'stop' messages to enable triggering a "we're
>> done, you can stop now".
>>
>> I've been digging through documentation and the apis and found nothing
>> that works.
>>
>> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000))
>>
>> I have tried setting the timestamp of those to MAX_LONG and
>> TIMESTAMP_MAX_VALUE but that yielded exceptions.
>>
>> So far I have not been able to figure out how to tell the TestPipeline:
>> Finish what you have and shutdown.
>>
>> How do I do that?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>
>

Re: Terminating a streaming integration test

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Niels,

AFAICT, for that reason in KafkaIOIT [1] we use “.withMaxNumRecords(numRecords)” where “numRecords” is actually a cardinality of input dataset. It’s not 100% faire imo, since in this case UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this.

Also, I believe that “p.run()” should be asynchronous by default [2] and I guess it’s blocked only for DirectRunner (perhaps in a sense of testing reasons).

[1] https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java>
[2] https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline <https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline>

> On 16 Jul 2020, at 15:02, Niels Basjes <Ni...@basjes.nl> wrote:
> 
> Hi,
> 
> I found a way that seems to work.
> 
> I have 
> @Rule
> public final transient TestPipeline pipeline = TestPipeline.create();
> in the test I configure PubSub to connect to the emulator
> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);
> options.setProject(PROJECT_NAME);
> options.setPubsubRootUrl(getPubsubRootUrl());
> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);
> options.setStreaming(true);
> I then hook my processing to this pipeline and afterwards I do this:
> pipeline
>         .getOptions()
>         .as(DirectOptions.class)
>         .setBlockOnRun(false);
> PipelineResult job = pipeline.run();
> long waitTime = 5000;
> LOG.info("Waiting ... {} seconds", waitTime/1000);
> job.waitUntilFinish(Duration.millis(waitTime));
> 
> Although this works it will fail my build on a slow machine.
> Is this the best option? Or can I inject a "stop" message in my stream and let the pipeline shutdown once it sees that?
> 
> 
> 
> 
> 
> 
> Can you please indicate if this is a valid way of doing this?
> Thanks.
> 
> 
> 
> 
> 
> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <Niels@basjes.nl <ma...@basjes.nl>> wrote:
> Hi,
> 
> I'm testing to see how I can build an integration test for a Beam application that uses PubSub.
> 
> So I start the Google provided PubSub emulator, create things like topic and subscription, put in some validation messages and then run the job against that and verify the data that comes out.
> I'm logging the events to the screen and there I see the data coming in and being processed.
> 
> The problem I have is that I have not been able to figure out how to cleanly terminate this stream after it has processed all my messages. 
> 
> I have also inserted some 'stop' messages to enable triggering a "we're done, you can stop now".
> 
> I've been digging through documentation and the apis and found nothing that works.
> 
> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000))
> 
> I have tried setting the timestamp of those to MAX_LONG and TIMESTAMP_MAX_VALUE but that yielded exceptions.
> 
> So far I have not been able to figure out how to tell the TestPipeline: Finish what you have and shutdown.
> 
> How do I do that?
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes


Re: Terminating a streaming integration test

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi,

I found a way that seems to work.

I have

@Rule

public final transient TestPipeline pipeline = TestPipeline.create();

in the test I configure PubSub to connect to the emulator

PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);

options.setProject(PROJECT_NAME);

options.setPubsubRootUrl(getPubsubRootUrl());

options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);

options.setStreaming(true);

I then hook my processing to this pipeline and afterwards I do this:

*pipeline
*        .getOptions()        .as(DirectOptions.class)
.setBlockOnRun(false);

PipelineResult job = pipeline.run();

long waitTime = 5000;

LOG.info("Waiting ... {} seconds", waitTime/1000);

job.waitUntilFinish(Duration.millis(waitTime));


Although this works it will fail my build on a slow machine.

Is this the best option? Or can I inject a "stop" message in my stream
and let the pipeline shutdown once it sees that?







Can you please indicate if this is a valid way of doing this?
Thanks.





On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <Ni...@basjes.nl> wrote:

> Hi,
>
> I'm testing to see how I can build an integration test for a Beam
> application that uses PubSub.
>
> So I start the Google provided PubSub emulator, create things like topic
> and subscription, put in some validation messages and then run the job
> against that and verify the data that comes out.
> I'm logging the events to the screen and there I see the data coming in
> and being processed.
>
> The problem I have is that I have not been able to figure out how to
> cleanly terminate this stream after it has processed all my messages.
>
> I have also inserted some 'stop' messages to enable triggering a "we're
> done, you can stop now".
>
> I've been digging through documentation and the apis and found nothing
> that works.
>
> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000))
>
> I have tried setting the timestamp of those to MAX_LONG and
> TIMESTAMP_MAX_VALUE but that yielded exceptions.
>
> So far I have not been able to figure out how to tell the TestPipeline:
> Finish what you have and shutdown.
>
> How do I do that?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes