You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/17 22:23:00 UTC

[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

    [ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690719#comment-16690719 ] 

ASF GitHub Bot commented on KAFKA-7536:
---------------------------------------

guozhangwang opened a new pull request #5923: KAFKA-7536: Initialize TopologyTestDriver with non-null topic
URL: https://github.com/apache/kafka/pull/5923
 
 
   In TopologyTestDriver constructor set non-null topic; and in unit test intentionally turn on caching to verify this case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -------------------------------------------------------------
>
>                 Key: KAFKA-7536
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7536
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Dmitry Minkovsky
>            Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable<String, ByteString> userIdsByEmail = topology          
>    .globalTable(USER_IDS_BY_EMAIL.name,
>                        USER_IDS_BY_EMAIL.consumed(),
>                        Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
>     def topology = // my topology
>     def driver = new TopologyTestDriver(topology, config())
>     def cleanup() {
>         driver.close()
>     }
>     def "create from email request"() {
>         def store = driver.getKeyValueStore('user-ids-by-email')
>         store.put('string', ByteString.copyFrom(new byte[0]))
>         // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
> 	at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
> 	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
> 	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
> 	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
> 	at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
> 	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
> 	at pony.message.MessageWriteStreamsTest.create from mailgun email request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)