You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Magalhães <sp...@gmail.com> on 2020/02/13 18:40:57 UTC

Test sink behaviour

Hi, I've created a CustomSink that writes parquet file to S3. Inside the
`invoke` method I have a loop to check if S3 is down, and if it is it will
wait exponentially until it is online again.

Now I want to write a test for this, and I can execute everything and see
that the Sink is doing what is suppose to do, but I can't have a way to
validate that is doing that programmatically (in a integration test).

One of the possibilities I was thinking was check the LazyLogger errors, to
verify that something was printed, but I can't mock Logger, since it is
final. Since I expose the number of errors as a counter, I was trying to
find a way to access it directly with Scala, but the only way I could find
was via Rest API, and that is kind of a hack.

Exemple:

- Get the Rest API port
with flinkCluster.getClusterClient.getFlinkConfiguration.getInteger("rest.port",
0)
- Get the jobId via http://localhost:61869/jobs/
- Get the verticeId via
http://localhost:61869/jobs/c3879cca4ba23ad734b2810ba0d73873
- Get the metric via
http://localhost:61869/jobs/c3879cca4ba23ad734b2810ba0d73873/vertices/0a448493b4782967b150582570326227/metrics/?get=0.Sink__Unnamed.errors_sink

Should be available a better way to get the metric or test this ?

Thanks

Re: Test sink behaviour

Posted by Till Rohrmann <tr...@apache.org>.
Hi David,

if you want to test the behavior together with S3, then you could check
that S3 contains a file after the job has completed.

If you want to test the failure and retry behaviour, then I would suggest
to introduce an own abstraction for the S3 access which you can control.
That way you can provide a testing implementation which imitates the
described behaviour (first being not available and later being reachable).
That way you should be able to test the behaviour pretty well w/o having to
access metrics.

Cheers,
Till

On Thu, Feb 13, 2020 at 7:41 PM David Magalhães <sp...@gmail.com>
wrote:

> Hi, I've created a CustomSink that writes parquet file to S3. Inside the
> `invoke` method I have a loop to check if S3 is down, and if it is it will
> wait exponentially until it is online again.
>
> Now I want to write a test for this, and I can execute everything and see
> that the Sink is doing what is suppose to do, but I can't have a way to
> validate that is doing that programmatically (in a integration test).
>
> One of the possibilities I was thinking was check the LazyLogger errors,
> to verify that something was printed, but I can't mock Logger, since it is
> final. Since I expose the number of errors as a counter, I was trying to
> find a way to access it directly with Scala, but the only way I could find
> was via Rest API, and that is kind of a hack.
>
> Exemple:
>
> - Get the Rest API port
> with flinkCluster.getClusterClient.getFlinkConfiguration.getInteger("rest.port",
> 0)
> - Get the jobId via http://localhost:61869/jobs/
> - Get the verticeId via
> http://localhost:61869/jobs/c3879cca4ba23ad734b2810ba0d73873
> - Get the metric via
> http://localhost:61869/jobs/c3879cca4ba23ad734b2810ba0d73873/vertices/0a448493b4782967b150582570326227/metrics/?get=0.Sink__Unnamed.errors_sink
>
> Should be available a better way to get the metric or test this ?
>
> Thanks
>