You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marcin Kuthan <ma...@gmail.com> on 2022/02/02 07:18:13 UTC

How to test stateful streaming pipeline?

Hi

This is my first question to the community so welcome everyone :) On a
daily basis I’m using Apache Beam for developing streaming pipelines but I
would like to learn native Flink as well. I’m looking for examples on how
to write integration tests with full programmatic control over watermark
and assertions on results to check that the results are early, on-time or
late.

Let’s assume the “Word Count” aggregation in the fixed/tumbling window. The
function “myWordCount'' gets a stream of data (PCollection/Dataset) and
calculates the word's cardinality in a fixed window within allowed
lateness. The whole pipeline and input/output are defined outside of that
function.

In Beam API the test might looks like:

words = testStreamOf[String]
      .addElementsAtTime("00:00:00", "foo")
      .addElementsAtTime("00:00:30", "bar")
      .advanceWatermarkTo("00:01:00")
      .addElementsAtTime("00:00:40", "foo") // late event
      .advanceWatermarkToInfinity()

// function under test
results = myWordCount(words, windowDuration = 1 minute)

results should inOnTimePane("00:00:00", "00:01:00") {
      containInAnyOrderAtTime(Seq(
        ("00:00:59.999", ("foo", 1L)),
        ("00:00:59.999", ("bar", 1L))
      ))
 }

results should inLatePane("00:00:00", "00:01:00") {
      containSingleValueAtTime(
        "00:00:59.999", ("foo", 1L) // “foo” from on-time pane was discarded
      )
 }

I found documentation for testing stateful UDFs but frankly speaking I
don’t know if it is for testing built-in Flink UDFs or for custom functions
as well. There is also only one test scenario example, far from
completeness.

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators

I also found flink-spector project, API looks promising but unfortunately
is not actively maintained anymore:

https://github.com/ottogroup/flink-spector

Could you share some documentation/examples/sources with integration tests
for Flink streaming pipelines, please? Perhaps I've missed something :)

Thanks in advance,
Marcin