You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2019/02/14 01:17:00 UTC

[jira] [Comment Edited] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

    [ https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767479#comment-16767479 ] 

Guozhang Wang edited comment on KAFKA-6460 at 2/14/19 1:16 AM:
---------------------------------------------------------------

Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update the description of the ticket as well).

The main goal for this test utils is similar to KIP-267, i.e. to provide users during their development cycles a smooth iterations of trial-and-error experience, where they do not necessarily need to bring up a full fledged rocksDB instance, for example. So the scope of this task would be:

1) provide a mock store interface that can be used for both {{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the {{Materialized#as(StoreSupplier<KeyValueStore<Bytes, byte[]>>)}} (the DSL layer). 

For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} inside {{streams/test-utils}} artifact, 

that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, and {{SessionStoreBuilder}} (for PAPI) as well as a {{StoreSupplier<KeyValueStore<Bytes, byte[]>>}} (for DSL) provided by a store name, whose {{build}} and {{get}} calls will return a mock (this mock implementation only need to be the inner-most store, i.e. it can still be wrapped with metered / caching / logging, and it should be inside {{org.apache.kafka.streams.test.internals}} package so that they are not part of the public APIs).

2) The mock store implementations should expect it's {{init}} function to be called with a {{MockProcessorContext}} which includes recording all records forwarded via this context (e.g. changelogs). So the mock store implementation itself only need to keep track of its store function calls.

3) The mock {{StoreBuilder}} or {{StoreSupplier}}'s {{build}} / {{get}} calls are supposed to be only called once since they are expected to be used with {{TopologyTestDriver}} which does not create multiple topologies. Then users can get access the mocked store instance via {{TopologyTestDriver#getXXStore}} and then use their public APIs to query the store.

4) Additional to allow users to query the store directly, user's may want to also get how many function calls are triggered --  e.g. maybe the current store returns `2` for key `k`, but we also want to make sure it was because `put(k, 1)` and `put(k, 2)` are called. This can be provided by a public API like {{MockStoreFactory#putEntries(storeName)}} that returns a list of entries that are called via {{put}}.

5) For Streams' own unit tests, we can then refactor them to use this new mock store factory. For example, we can remove the internal {{org.apache.kafka.streams.state.KeyValueStoreTestDriver}} and use the above to refactor any unit tests related to this class -- one logic that is not yet supported in {{TopologyTestDriver}} / {{MockStoreFactory}} is store restoration, i.e. streams library may wan to pipe-in some records to the corresponding changelog first before starting the test driver, which will then be used to bootstrap the (possibly mocked) stores. This is not of interest to users, but streams' own unit testing need to cover.



was (Author: guozhang):
Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update the description of the ticket as well).

The main goal for this test utils is similar to KIP-267, i.e. to provide users during their development cycles a smooth iterations of trial-and-error experience, where they do not necessarily need to bring up a full fledged rocksDB instance, for example. So the scope of this task would be:

1) provide a mock store interface that can be used for both {{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the {{Materialized#as(StoreSupplier<KeyValueStore<Bytes, byte[]>>)}} (the DSL layer). 

For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} inside {{streams/test-utils}} artifact, 

that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, and {{SessionStoreBuilder}} (for PAPI) as well as a {{StoreSupplier<KeyValueStore<Bytes, byte[]>>}} (for DSL) provided by a store name, whose {{build}} and {{get}} calls will return a mock (this mock implementation only need to be the inner-most store, i.e. it can still be wrapped with metered / caching / logging, and it should be inside {{org.apache.kafka.streams.test.internals}} package so that they are not part of the public APIs).

2) The mock store implementations should expect it's {{init}} function to be called with a {{MockProcessorContext}} which includes recording all records forwarded via this context (e.g. changelogs). So the mock store implementation itself only need to keep track of its store function calls.

3) The mock {{StoreBuilder}} or {{StoreSupplier}}'s {{build} / {{get}} calls are supposed to be only called once since they are expected to be used with {{TopologyTestDriver}} which does not create multiple topologies. Then users can get access the mocked store instance via {{TopologyTestDriver#getXXStore}} and then use their public APIs to query the store.

4) Additional to allow users to query the store directly, user's may want to also get how many function calls are triggered --  e.g. maybe the current store returns `2` for key `k`, but we also want to make sure it was because `put(k, 1)` and `put(k, 2)` are called. This can be provided by a public API like {{MockStoreFactory#putEntries(storeName)}} that returns a list of entries that are called via {{put}}.

5) For Streams' own unit tests, we can then refactor them to use this new mock store factory. For example, we can remove the internal {{org.apache.kafka.streams.state.KeyValueStoreTestDriver}} and use the above to refactor any unit tests related to this class -- one logic that is not yet supported in {{TopologyTestDriver}} / {{MockStoreFactory}} is store restoration, i.e. streams library may wan to pipe-in some records to the corresponding changelog first before starting the test driver, which will then be used to bootstrap the (possibly mocked) stores. This is not of interest to users, but streams' own unit testing need to cover.


> Add mocks for state stores used in Streams unit testing
> -------------------------------------------------------
>
>                 Key: KAFKA-6460
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6460
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams, unit tests
>            Reporter: Guozhang Wang
>            Assignee: Yishun Guan
>            Priority: Major
>              Labels: newbie++
>
> We'd like to use mocks for different types of state stores: kv, window, session that can be used to record the number of expected put / get calls used in the DSL operator unit testing. This involves implementing the two interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object created from, say, EasyMock, and the object can then be set up with the expected calls.
> In addition, we should also add a mock record collector which can be returned from the mock processor context so that with logging enabled store, users can also validate if the changes have been forwarded to the changelog as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)