You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2018/11/20 22:50:00 UTC
[jira] [Resolved] (KAFKA-7536) TopologyTestDriver cannot
pre-populate KTable or GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-7536.
----------------------------------
Resolution: Fixed
Assignee: Guozhang Wang
Fix Version/s: 2.0.2
2.1.1
2.2.0
> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -------------------------------------------------------------
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.0.0
> Reporter: Dmitry Minkovsky
> Assignee: Guozhang Wang
> Priority: Minor
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable<String, ByteString> userIdsByEmail = topology
> .globalTable(USER_IDS_BY_EMAIL.name,
> USER_IDS_BY_EMAIL.consumed(),
> Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
> at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
> at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
> at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
> at pony.message.MessageWriteStreamsTest.create from mailgun email request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with {{driver.pipeInput}}. But otherwise I get the above error.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)