You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "saiprasad mishra (JIRA)" <ji...@apache.org> on 2016/10/26 21:21:58 UTC

[jira] [Comment Edited] (KAFKA-4344) Exception when accessing partition, offset and timestamp in processor class

    [ https://issues.apache.org/jira/browse/KAFKA-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15609709#comment-15609709 ] 

saiprasad mishra edited comment on KAFKA-4344 at 10/26/16 9:21 PM:
-------------------------------------------------------------------

Sorry i was just writing the reply while you were writing the comments :). I have lot of business logic and could not share it all. I do populate the local context variable from processorContext while init happens.

I will be happy to close this if you agree.




was (Author: saimishra):
Sorry i was just writing the reply while you were writing the comments :). I have lot of business logic and could not share it all. I do populate the local context variable from processorContext while init happens.



> Exception when accessing partition, offset and timestamp in processor class
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-4344
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4344
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: saiprasad mishra
>            Assignee: Guozhang Wang
>            Priority: Minor
>
> I have a kafka stream pipeline like below
> source topic stream -> filter for null value ->map to make it keyed by id ->custom processor to mystore ->to another topic -> ktable
> I am hitting the below type of exception in a custom processor class if I try to access offset() or partition() or timestamp() from the ProcessorContext in the process() method. I was hoping it would return the partition and offset for the enclosing topic(in this case source topic) where its consuming from or -1 based on the api docs.
> java.lang.IllegalStateException: This should not happen as offset() should only be called while a record is processed
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
> 	at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) ~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)