You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rion Williams <ri...@gmail.com> on 2021/03/03 19:10:43 UTC

Unit Testing State Stores in KeyedProcessFunctions

Hi all!

Is it possible to apply assertions against the underlying state stores
within a KeyedProcessFunction using the existing
KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I
wanted to ensure that if I passed in two elements each with unique keys
that I would be able to query the underlying state stores to ensure they
were working as expected. I don’t really see a mechanism that would support
such behavior (i.e. give me the state store for key n from the operator?)

@Test
fun `Verify that instances with different keys retain separate
watermarks`() {
    // Arrange
    val logs = listOf(
        StreamRecord(TestGenerator.generateLog(tenant = "A")),
        StreamRecord(TestGenerator.generateLog(tenant = "B")),
    )

    // Act
    magicWindowHarness
        .processElements(logs)

    // Assert (I'd like to access the state by key for each here)
    assert(magicWindowHarness.getStateForKey("A"), ...)
    assert(magicWindowHarness.getStateForKey("B"), ...)
}

Is something like this possible or is there a better way to access the
underlying state store? It seemed to work as expected when only a single
key was involved, but when multiple keys were involved, things seemed to
fall apart. The current testing documentation [0] is fantastic, however I
think this might qualify as a more advanced task than it covered.

At present all of the state stores of the underlying function are privately
declared, which may/may not be relevant:

@Transient private lateinit var watermark: ValueState<Long>
@Transient private lateinit var scheduledEvictions: MapState<Long, Long>

Any recommendations or advice would be greatly appreciated and I'll be
happy to provide any additional context/details as needed.

Thanks a lot!

Rion

[0]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

Re: Unit Testing State Stores in KeyedProcessFunctions

Posted by Rion Williams <ri...@gmail.com>.
Thanks Chesnay,

I agree that output testing is more practical and far less brittle, I was just curious if support was there for it. I have a specific use case where I’m managing my own windows and may schedule something to be emitted but after some processing time delay so it could potentially be valuable to see this scheduling in state since it may not directly coincide with output.

Not a huge deal, I already have tests in place that function as black boxes with output verification, it was more of a question if it was supported.

Thanks much,

Rion

> On Mar 3, 2021, at 2:44 PM, Chesnay Schepler <ch...@apache.org> wrote:
> 
> 
> I do not believe this to be possible.
> 
> Given that the state will likely in some form affect the behavior of the function (usually in regards to what it outputs), it may be a better idea to test for that. (I suppose you'd want tests like that anyway)
> 
> On 3/3/2021 8:10 PM, Rion Williams wrote:
>> Hi all!
>> 
>> Is it possible to apply assertions against the underlying state stores within a KeyedProcessFunction using the existing KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I wanted to ensure that if I passed in two elements each with unique keys that I would be able to query the underlying state stores to ensure they were working as expected. I don’t really see a mechanism that would support such behavior (i.e. give me the state store for key n from the operator?)
>> 
>> @Test
>> fun `Verify that instances with different keys retain separate watermarks`() {
>>     // Arrange
>>     val logs = listOf(
>>         StreamRecord(TestGenerator.generateLog(tenant = "A")),
>>         StreamRecord(TestGenerator.generateLog(tenant = "B")),
>>     )
>> 
>>     // Act
>>     magicWindowHarness
>>         .processElements(logs)
>> 
>>     // Assert (I'd like to access the state by key for each here)
>>     assert(magicWindowHarness.getStateForKey("A"), ...)
>>     assert(magicWindowHarness.getStateForKey("B"), ...)
>> }
>> 
>> Is something like this possible or is there a better way to access the underlying state store? It seemed to work as expected when only a single key was involved, but when multiple keys were involved, things seemed to fall apart. The current testing documentation [0] is fantastic, however I think this might qualify as a more advanced task than it covered.
>> 
>> At present all of the state stores of the underlying function are privately declared, which may/may not be relevant:
>> 
>> @Transient private lateinit var watermark: ValueState<Long>
>> @Transient private lateinit var scheduledEvictions: MapState<Long, Long>
>> 
>> Any recommendations or advice would be greatly appreciated and I'll be happy to provide any additional context/details as needed.
>> 
>> Thanks a lot!
>> 
>> Rion
>> 
>> [0]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> 

Re: Unit Testing State Stores in KeyedProcessFunctions

Posted by Chesnay Schepler <ch...@apache.org>.
I do not believe this to be possible.

Given that the state will likely in some form affect the behavior of the 
function (usually in regards to what it outputs), it may be a better 
idea to test for that. (I suppose you'd want tests like that anyway)

On 3/3/2021 8:10 PM, Rion Williams wrote:
> Hi all!
>
> Is it possible to apply assertions against the underlying state stores 
> within a KeyedProcessFunction using the existing 
> KeyedOneInputStreamOperatorTestHarness class within unit tests? 
> Basically I wanted to ensure that if I passed in two elements each 
> with unique keys that I would be able to query the underlying state 
> stores to ensure they were working as expected. I don’t really see a 
> mechanism that would support such behavior (i.e. give me the state 
> store for key n from the operator?)
>
> @Test
> fun `Verify that instances with different keys retain separate 
> watermarks`() {
>     // Arrange
>     val logs = listOf(
>         StreamRecord(TestGenerator.generateLog(tenant = "A")),
>         StreamRecord(TestGenerator.generateLog(tenant = "B")),
>     )
>
>     // Act
>     magicWindowHarness
>         .processElements(logs)
>
>     // Assert (I'd like to access the state by key for each here)
>     assert(magicWindowHarness.getStateForKey("A"), ...)
>     assert(magicWindowHarness.getStateForKey("B"), ...)
> }
>
> Is something like this possible or is there a better way to access the 
> underlying state store? It seemed to work as expected when only a 
> single key was involved, but when multiple keys were involved, things 
> seemed to fall apart. The current testing documentation [0] is 
> fantastic, however I think this might qualify as a more advanced task 
> than it covered.
>
> At present all of the state stores of the underlying function are 
> privately declared, which may/may not be relevant:
>
> @Transient private lateinit var watermark: ValueState<Long>
> @Transient private lateinit var scheduledEvictions: MapState<Long, Long>
>
> Any recommendations or advice would be greatly appreciated and I'll be 
> happy to provide any additional context/details as needed.
>
> Thanks a lot!
>
> Rion
>
> [0]: 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html 
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html>