You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by saiprasad mishra <sa...@gmail.com> on 2016/10/24 04:18:28 UTC

Exception when accessing partition, offset and timestamp in processor class

Hi

his is with my streaming app kafka 10.1.0.

My flow looks something like below

source topic stream -> filter for null value ->map to make it keyed by id
->custom processor to mystore -> to another topic -> ktable

I am hitting the below type of exception in a custom processor class if I
try to access offset() or partition() or timestamp() from the
ProcessorContext in the process() method. I was hoping it would return the
partition and offset for the enclosing topic(in this case source topic)
where its consuming from or -1 based on the api docs.

Looks like only in certain cases it is accessible. is it getting lost in
transformation phases.

Same issue happens on if i try to access them in punctuate() method but
some where I saw that it might not work in punctuate(). Any reason for this
or any link describing this will be helpful


====================================================================

java.lang.IllegalStateException: This should not happen as offset() should
only be called while a record is processed
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
~[kafka-streams-0.10.1.0.jar!/:?]
at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
[kafka-streams-0.10.1.0.jar!/:?]
=====================================================================


Regards
Sai

Re: Exception when accessing partition, offset and timestamp in processor class

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Thx! :)

On 10/25/16 2:03 PM, saiprasad mishra wrote:
> Just created the JIRA
> 
> https://issues.apache.org/jira/browse/KAFKA-4344
> 
> Regards Sai
> 
> On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra < 
> saiprasadmishra@gmail.com> wrote:
> 
>> My JIRA id is saimishra
>> 
>> Regards Sai
>> 
>> On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax
>> <ma...@confluent.io> wrote:
>> 
> What is your JIRA ID? We can add you to the contributor list to
> give you permission.
> 
> -Matthias
> 
> 
> On 10/25/16 10:48 AM, saiprasad mishra wrote:
>>>>> Hi Matthias Thanks for the reply. I think I don't have
>>>>> permission for this. If you can grant me permission I can
>>>>> create one (my handle is saimishra). Or you can go ahead
>>>>> and create one
>>>>> 
>>>>> I may need permission to create JIRA as I might report more
>>>>> issues after discussing with you over here.
>>>>> 
>>>>> Regards Sai
>>>>> 
>>>>> On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax 
>>>>> <ma...@confluent.io> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> sorry for late reply. Seems like a bug to me; within 
>>>>> Processor#process() accessing the context should work. Can
>>>>> you open a JIRA for it?
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 10/23/16 10:28 PM, saiprasad mishra wrote:
>>>>>>>> Sorry for the email again
>>>>>>>> 
>>>>>>>> I was expecting it to work always when accessed from 
>>>>>>>> process() method as this corresponds to each kafka 
>>>>>>>> message/record processing. I understand illegalstate
>>>>>>>> by the time punctuate() is called as its already
>>>>>>>> batched by time interval
>>>>>>>> 
>>>>>>>> Regards Sai
>>>>>>>> 
>>>>>>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra 
>>>>>>>> <saiprasadmishra@gmail.com
>>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi
>>>>>>>>> 
>>>>>>>>> his is with my streaming app kafka 10.1.0.
>>>>>>>>> 
>>>>>>>>> My flow looks something like below
>>>>>>>>> 
>>>>>>>>> source topic stream -> filter for null value ->map
>>>>>>>>> to make it keyed by id ->custom processor to
>>>>>>>>> mystore -> to another topic -> ktable
>>>>>>>>> 
>>>>>>>>> I am hitting the below type of exception in a
>>>>>>>>> custom processor class if I try to access offset()
>>>>>>>>> or partition() or timestamp() from the
>>>>>>>>> ProcessorContext in the process() method. I was
>>>>>>>>> hoping it would return the partition and offset for
>>>>>>>>> the enclosing topic(in this case source topic) 
>>>>>>>>> where its consuming from or -1 based on the api
>>>>>>>>> docs.
>>>>>>>>> 
>>>>>>>>> Looks like only in certain cases it is accessible.
>>>>>>>>> is it getting lost in transformation phases.
>>>>>>>>> 
>>>>>>>>> Same issue happens on if i try to access them in 
>>>>>>>>> punctuate() method but some where I saw that it
>>>>>>>>> might not work in punctuate(). Any reason for this
>>>>>>>>> or any link describing this will be helpful
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> ==============================================================
====
>
>>>>>>>>> 
==
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> 
>>>>>>>>> 
> java.lang.IllegalStateException: This should not happen as
> offset()
>>>>>>>>> should only be called while a record is processed
>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
>>>>>>>>>
>>>>>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>>>>>> com.sai.repo.MyStore.process(MyStore.java:72) 
>>>>>>>>> ~[classes!/:?] at 
>>>>>>>>> com.sai.repo.MyStore.process(MyStore.java:39) 
>>>>>>>>> ~[classes!/:?] at 
>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pro
cess
>
>>>>>>>>> 
(Pr
>>>>> 
>>>>>>>>> 
> ocessorNode.java:82)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$
>>>>>>>>>
>>>>>>>>> 
KStreamMapProcessor.process(KStreamMap.java:43)
>>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pro
cess
>
>>>>>>>>> 
(Pr
>>>>> 
>>>>>>>>> 
> ocessorNode.java:82)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamFilter$
>>>>>>>>>
>>>>>>>>> 
KStreamFilterProcessor.process(KStreamFilter.java:44)
>>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pro
cess
>
>>>>>>>>> 
(Pr
>>>>> 
>>>>>>>>> 
> ocessorNode.java:82)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>> SourceNode.process(SourceNode.java:66) 
>>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>> StreamTask.process(StreamTask.java:181) 
>>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runL
oop(
>
>>>>>>>>> 
Str
>>>>> 
>>>>>>>>> 
> eamThread.java:436)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>> StreamThread.run(StreamThread.java:242) 
>>>>>>>>> [kafka-streams-0.10.1.0.jar!/:?] 
>>>>>>>>> ==============================================================
====
>
>>>>>>>>> 
===
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> 
>>>>>>>>> 
> Regards
>>>>>>>>> Sai
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYD991AAoJECnhiMLycopPoTsQAJTJ5WoftXEIcTUOyeJW2QT7
FOi/SKBNZOgIl7MKjpUFE1c1LZED5iWp/f1+JzzMP3IwrXmLNMpaa2qTAtulp7XM
xsuVg8W+KJ1oxDMrErknegXoI7BxI4Q6+VewANWS7l6b2CObKPCqryDJ5ak0G9Tx
kDB6j8Tvkg9dIfyP3HmgGrkrcpxJqUt0VLf9pSAnCNE67Ikg/uufi8QP8B9ZbvHZ
YJtf0HUdSjEqoLsWI0cvuZOkkCKqasTwyF4AIomuon5Lf3SzLmhQOgXiJoDVyG+r
LsvOP+uPlEi5i54vETlJ2+EWVr8LO9HVIzxg7Fmm0m99Z1tsYIapfBhHCyCUNMDu
nFBN00v2Auu90959X901JEhRipeJmBcpHwzHdw3raqnwMRruVq3OyWZs0wGnkEej
kSEHK/j50UZZ+RYEkneFe17vWws0JDhka5ginUXUzPWvbDwXS5VB5diDNYVYHnRW
ZtlIOGtwF72ZoBRQO1i3iwnebjM2U8U36W13mj/Qt72uD2tD3fZiLU922rLIQWTd
praF/k+rHgBoDyM045cdvx+Fc5a7xEMzGQRDl2WfHwijq7tmMNBFQQJjpkPckbxE
stF17J+AfoAYRullXP4awVEGJULmwpsl0eUCIenZ1lEe+o4i2v9AD8XbxQzLKOnX
QG6qCk0H26/+1owryB8l
=PcVz
-----END PGP SIGNATURE-----

Re: Exception when accessing partition, offset and timestamp in processor class

Posted by saiprasad mishra <sa...@gmail.com>.
Just created the JIRA

https://issues.apache.org/jira/browse/KAFKA-4344

Regards
Sai

On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra <
saiprasadmishra@gmail.com> wrote:

> My JIRA id is saimishra
>
> Regards
> Sai
>
> On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> -----BEGIN PGP SIGNED MESSAGE-----
>> Hash: SHA512
>>
>> What is your JIRA ID? We can add you to the contributor list to give
>> you permission.
>>
>> - -Matthias
>>
>>
>> On 10/25/16 10:48 AM, saiprasad mishra wrote:
>> > Hi Matthias Thanks for the reply. I think I don't have permission
>> > for this. If you can grant me permission I can create one (my
>> > handle is saimishra). Or you can go ahead and create one
>> >
>> > I may need permission to create JIRA as I might report more issues
>> > after discussing with you over here.
>> >
>> > Regards Sai
>> >
>> > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax
>> > <ma...@confluent.io> wrote:
>> >
>> > Hi,
>> >
>> > sorry for late reply. Seems like a bug to me; within
>> > Processor#process() accessing the context should work. Can you open
>> > a JIRA for it?
>> >
>> > -Matthias
>> >
>> > On 10/23/16 10:28 PM, saiprasad mishra wrote:
>> >>>> Sorry for the email again
>> >>>>
>> >>>> I was expecting it to work always when accessed from
>> >>>> process() method as this corresponds to each kafka
>> >>>> message/record processing. I understand illegalstate by the
>> >>>> time punctuate() is called as its already batched by time
>> >>>> interval
>> >>>>
>> >>>> Regards Sai
>> >>>>
>> >>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra
>> >>>> <saiprasadmishra@gmail.com
>> >>>>> wrote:
>> >>>>
>> >>>>> Hi
>> >>>>>
>> >>>>> his is with my streaming app kafka 10.1.0.
>> >>>>>
>> >>>>> My flow looks something like below
>> >>>>>
>> >>>>> source topic stream -> filter for null value ->map to make
>> >>>>> it keyed by id ->custom processor to mystore -> to another
>> >>>>> topic -> ktable
>> >>>>>
>> >>>>> I am hitting the below type of exception in a custom
>> >>>>> processor class if I try to access offset() or partition()
>> >>>>> or timestamp() from the ProcessorContext in the process()
>> >>>>> method. I was hoping it would return the partition and
>> >>>>> offset for the enclosing topic(in this case source topic)
>> >>>>> where its consuming from or -1 based on the api docs.
>> >>>>>
>> >>>>> Looks like only in certain cases it is accessible. is it
>> >>>>> getting lost in transformation phases.
>> >>>>>
>> >>>>> Same issue happens on if i try to access them in
>> >>>>> punctuate() method but some where I saw that it might not
>> >>>>> work in punctuate(). Any reason for this or any link
>> >>>>> describing this will be helpful
>> >>>>>
>> >>>>>
>> >>>>> ==================================================================
>> ==
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >
>> >>>>>
>> java.lang.IllegalStateException: This should not happen as offset()
>> >>>>> should only be called while a record is processed at
>> >>>>> org.apache.kafka.streams.processor.internals.
>> >>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
>> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> >>>>> com.sai.repo.MyStore.process(MyStore.java:72)
>> >>>>> ~[classes!/:?] at
>> >>>>> com.sai.repo.MyStore.process(MyStore.java:39)
>> >>>>> ~[classes!/:?] at
>> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
>> (Pr
>> >
>> >>>>>
>> ocessorNode.java:82)
>> >>>>>
>> >>>>>
>> >>>>>
>> > ~[kafka-streams-0.10.1.0.jar!/:?]
>> >>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> >>>>>
>> >>>>>
>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> >>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$
>> >>>>> KStreamMapProcessor.process(KStreamMap.java:43)
>> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
>> (Pr
>> >
>> >>>>>
>> ocessorNode.java:82)
>> >>>>>
>> >>>>>
>> >>>>>
>> > ~[kafka-streams-0.10.1.0.jar!/:?]
>> >>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> >>>>>
>> >>>>>
>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> >>>>> org.apache.kafka.streams.kstream.internals.KStreamFilter$
>> >>>>> KStreamFilterProcessor.process(KStreamFilter.java:44)
>> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
>> (Pr
>> >
>> >>>>>
>> ocessorNode.java:82)
>> >>>>>
>> >>>>>
>> >>>>>
>> > ~[kafka-streams-0.10.1.0.jar!/:?]
>> >>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> >>>>>
>> >>>>>
>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> >>>>> org.apache.kafka.streams.processor.internals.
>> >>>>> SourceNode.process(SourceNode.java:66)
>> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> >>>>> org.apache.kafka.streams.processor.internals.
>> >>>>> StreamTask.process(StreamTask.java:181)
>> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> >>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> Str
>> >
>> >>>>>
>> eamThread.java:436)
>> >>>>>
>> >>>>>
>> >>>>>
>> > ~[kafka-streams-0.10.1.0.jar!/:?]
>> >>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>> StreamThread.run(StreamThread.java:242)
>> >>>>> [kafka-streams-0.10.1.0.jar!/:?]
>> >>>>> ==================================================================
>> ===
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >
>> >>>>>
>> Regards
>> >>>>> Sai
>> >>>>>
>> >>>>
>> >>
>> >
>> -----BEGIN PGP SIGNATURE-----
>> Comment: GPGTools - https://gpgtools.org
>>
>> iQIcBAEBCgAGBQJYD5yXAAoJECnhiMLycopP+Y4QAK8vkvLauOiJQrb2b56bwZIG
>> 7uSMnCagjGYnpMnpI1pB88ccJhEMgCvKI8NctXzv967oRoNsRAREdbmF/Li3Vfot
>> u/Qkk4OUHQzHVu+vn+jEoIG3wZ2d6I40lQWKewJj7YAFhFgLcXLDO9qb447UMJgf
>> SNI30pmVXb0s6jrF2y20x/GLXlbQbdPrl0Rve5zK7uO1uHQ/i9K3k3Z3Xa34tHdj
>> XgKJfgDL2sX+1vm1Rlw6aOLesPfsv5L3WSnWVMia9c4+oWoox7UQ2sO2oq5bTVs0
>> ey2F2PGY7eiPHF/Bp2ffcRInH2X2XG+YNY3txl89x5hgeK5WylTnotabZzKaSYJt
>> /XOT7JdfmyBqxo9fpltX9JGuIEZh9Lck7a8+G+UlkFgwmMx9YRaYZtFDKw/u27BT
>> CMfUyhwvVXAC7Mgqi+D2Cvu7CBroe7vMp1Y0cnutDoAcFYPVe/143cyV+66EjX9d
>> PIkmcRGf99pj1yzJpZ8tZJr3O8iQXeDAVsf+imH+8KRWdbwjIZZXpGPlwr8Vts0G
>> mI0/3twjskrIkpTMPd2XtT9BqmbyrqvidMRLdDSq+sEEyEY1K3vqny0en3oYX8EX
>> 8RC4MkJN/7rZjZyOsTWRA+J474qUJmK+m2MmLzp25Fr1ivcjd5XcQR77uGSpKjWR
>> qgTIsl1GY1zR1RLsoRKQ
>> =4hLW
>> -----END PGP SIGNATURE-----
>>
>
>

Re: Exception when accessing partition, offset and timestamp in processor class

Posted by saiprasad mishra <sa...@gmail.com>.
My JIRA id is saimishra

Regards
Sai

On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> What is your JIRA ID? We can add you to the contributor list to give
> you permission.
>
> - -Matthias
>
>
> On 10/25/16 10:48 AM, saiprasad mishra wrote:
> > Hi Matthias Thanks for the reply. I think I don't have permission
> > for this. If you can grant me permission I can create one (my
> > handle is saimishra). Or you can go ahead and create one
> >
> > I may need permission to create JIRA as I might report more issues
> > after discussing with you over here.
> >
> > Regards Sai
> >
> > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax
> > <ma...@confluent.io> wrote:
> >
> > Hi,
> >
> > sorry for late reply. Seems like a bug to me; within
> > Processor#process() accessing the context should work. Can you open
> > a JIRA for it?
> >
> > -Matthias
> >
> > On 10/23/16 10:28 PM, saiprasad mishra wrote:
> >>>> Sorry for the email again
> >>>>
> >>>> I was expecting it to work always when accessed from
> >>>> process() method as this corresponds to each kafka
> >>>> message/record processing. I understand illegalstate by the
> >>>> time punctuate() is called as its already batched by time
> >>>> interval
> >>>>
> >>>> Regards Sai
> >>>>
> >>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra
> >>>> <saiprasadmishra@gmail.com
> >>>>> wrote:
> >>>>
> >>>>> Hi
> >>>>>
> >>>>> his is with my streaming app kafka 10.1.0.
> >>>>>
> >>>>> My flow looks something like below
> >>>>>
> >>>>> source topic stream -> filter for null value ->map to make
> >>>>> it keyed by id ->custom processor to mystore -> to another
> >>>>> topic -> ktable
> >>>>>
> >>>>> I am hitting the below type of exception in a custom
> >>>>> processor class if I try to access offset() or partition()
> >>>>> or timestamp() from the ProcessorContext in the process()
> >>>>> method. I was hoping it would return the partition and
> >>>>> offset for the enclosing topic(in this case source topic)
> >>>>> where its consuming from or -1 based on the api docs.
> >>>>>
> >>>>> Looks like only in certain cases it is accessible. is it
> >>>>> getting lost in transformation phases.
> >>>>>
> >>>>> Same issue happens on if i try to access them in
> >>>>> punctuate() method but some where I saw that it might not
> >>>>> work in punctuate(). Any reason for this or any link
> >>>>> describing this will be helpful
> >>>>>
> >>>>>
> >>>>> ==================================================================
> ==
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >
> >>>>>
> java.lang.IllegalStateException: This should not happen as offset()
> >>>>> should only be called while a record is processed at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >>>>> com.sai.repo.MyStore.process(MyStore.java:72)
> >>>>> ~[classes!/:?] at
> >>>>> com.sai.repo.MyStore.process(MyStore.java:39)
> >>>>> ~[classes!/:?] at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (Pr
> >
> >>>>>
> ocessorNode.java:82)
> >>>>>
> >>>>>
> >>>>>
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >>>>>
> >>>>>
> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$
> >>>>> KStreamMapProcessor.process(KStreamMap.java:43)
> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (Pr
> >
> >>>>>
> ocessorNode.java:82)
> >>>>>
> >>>>>
> >>>>>
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >>>>>
> >>>>>
> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamFilter$
> >>>>> KStreamFilterProcessor.process(KStreamFilter.java:44)
> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (Pr
> >
> >>>>>
> ocessorNode.java:82)
> >>>>>
> >>>>>
> >>>>>
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >>>>>
> >>>>>
> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> SourceNode.process(SourceNode.java:66)
> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:181)
> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> Str
> >
> >>>>>
> eamThread.java:436)
> >>>>>
> >>>>>
> >>>>>
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.run(StreamThread.java:242)
> >>>>> [kafka-streams-0.10.1.0.jar!/:?]
> >>>>> ==================================================================
> ===
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >
> >>>>>
> Regards
> >>>>> Sai
> >>>>>
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYD5yXAAoJECnhiMLycopP+Y4QAK8vkvLauOiJQrb2b56bwZIG
> 7uSMnCagjGYnpMnpI1pB88ccJhEMgCvKI8NctXzv967oRoNsRAREdbmF/Li3Vfot
> u/Qkk4OUHQzHVu+vn+jEoIG3wZ2d6I40lQWKewJj7YAFhFgLcXLDO9qb447UMJgf
> SNI30pmVXb0s6jrF2y20x/GLXlbQbdPrl0Rve5zK7uO1uHQ/i9K3k3Z3Xa34tHdj
> XgKJfgDL2sX+1vm1Rlw6aOLesPfsv5L3WSnWVMia9c4+oWoox7UQ2sO2oq5bTVs0
> ey2F2PGY7eiPHF/Bp2ffcRInH2X2XG+YNY3txl89x5hgeK5WylTnotabZzKaSYJt
> /XOT7JdfmyBqxo9fpltX9JGuIEZh9Lck7a8+G+UlkFgwmMx9YRaYZtFDKw/u27BT
> CMfUyhwvVXAC7Mgqi+D2Cvu7CBroe7vMp1Y0cnutDoAcFYPVe/143cyV+66EjX9d
> PIkmcRGf99pj1yzJpZ8tZJr3O8iQXeDAVsf+imH+8KRWdbwjIZZXpGPlwr8Vts0G
> mI0/3twjskrIkpTMPd2XtT9BqmbyrqvidMRLdDSq+sEEyEY1K3vqny0en3oYX8EX
> 8RC4MkJN/7rZjZyOsTWRA+J474qUJmK+m2MmLzp25Fr1ivcjd5XcQR77uGSpKjWR
> qgTIsl1GY1zR1RLsoRKQ
> =4hLW
> -----END PGP SIGNATURE-----
>

Re: Exception when accessing partition, offset and timestamp in processor class

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

What is your JIRA ID? We can add you to the contributor list to give
you permission.

- -Matthias


On 10/25/16 10:48 AM, saiprasad mishra wrote:
> Hi Matthias Thanks for the reply. I think I don't have permission
> for this. If you can grant me permission I can create one (my
> handle is saimishra). Or you can go ahead and create one
> 
> I may need permission to create JIRA as I might report more issues
> after discussing with you over here.
> 
> Regards Sai
> 
> On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax
> <ma...@confluent.io> wrote:
> 
> Hi,
> 
> sorry for late reply. Seems like a bug to me; within 
> Processor#process() accessing the context should work. Can you open
> a JIRA for it?
> 
> -Matthias
> 
> On 10/23/16 10:28 PM, saiprasad mishra wrote:
>>>> Sorry for the email again
>>>> 
>>>> I was expecting it to work always when accessed from
>>>> process() method as this corresponds to each kafka
>>>> message/record processing. I understand illegalstate by the
>>>> time punctuate() is called as its already batched by time
>>>> interval
>>>> 
>>>> Regards Sai
>>>> 
>>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra 
>>>> <saiprasadmishra@gmail.com
>>>>> wrote:
>>>> 
>>>>> Hi
>>>>> 
>>>>> his is with my streaming app kafka 10.1.0.
>>>>> 
>>>>> My flow looks something like below
>>>>> 
>>>>> source topic stream -> filter for null value ->map to make
>>>>> it keyed by id ->custom processor to mystore -> to another
>>>>> topic -> ktable
>>>>> 
>>>>> I am hitting the below type of exception in a custom
>>>>> processor class if I try to access offset() or partition()
>>>>> or timestamp() from the ProcessorContext in the process()
>>>>> method. I was hoping it would return the partition and
>>>>> offset for the enclosing topic(in this case source topic)
>>>>> where its consuming from or -1 based on the api docs.
>>>>> 
>>>>> Looks like only in certain cases it is accessible. is it
>>>>> getting lost in transformation phases.
>>>>> 
>>>>> Same issue happens on if i try to access them in
>>>>> punctuate() method but some where I saw that it might not
>>>>> work in punctuate(). Any reason for this or any link
>>>>> describing this will be helpful
>>>>> 
>>>>> 
>>>>> ==================================================================
==
>>>>>
>>>>>
>>>>>
>>>>>
>
>>>>> 
java.lang.IllegalStateException: This should not happen as offset()
>>>>> should only be called while a record is processed at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>> com.sai.repo.MyStore.process(MyStore.java:72)
>>>>> ~[classes!/:?] at 
>>>>> com.sai.repo.MyStore.process(MyStore.java:39)
>>>>> ~[classes!/:?] at 
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(Pr
>
>>>>> 
ocessorNode.java:82)
>>>>> 
>>>>> 
>>>>> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>
>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$ 
>>>>> KStreamMapProcessor.process(KStreamMap.java:43) 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(Pr
>
>>>>> 
ocessorNode.java:82)
>>>>> 
>>>>> 
>>>>> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>
>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>> org.apache.kafka.streams.kstream.internals.KStreamFilter$ 
>>>>> KStreamFilterProcessor.process(KStreamFilter.java:44) 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(Pr
>
>>>>> 
ocessorNode.java:82)
>>>>> 
>>>>> 
>>>>> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>
>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> SourceNode.process(SourceNode.java:66) 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.process(StreamTask.java:181) 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
Str
>
>>>>> 
eamThread.java:436)
>>>>> 
>>>>> 
>>>>> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamThread.run(StreamThread.java:242) 
>>>>> [kafka-streams-0.10.1.0.jar!/:?] 
>>>>> ==================================================================
===
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>
>>>>> 
Regards
>>>>> Sai
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYD5yXAAoJECnhiMLycopP+Y4QAK8vkvLauOiJQrb2b56bwZIG
7uSMnCagjGYnpMnpI1pB88ccJhEMgCvKI8NctXzv967oRoNsRAREdbmF/Li3Vfot
u/Qkk4OUHQzHVu+vn+jEoIG3wZ2d6I40lQWKewJj7YAFhFgLcXLDO9qb447UMJgf
SNI30pmVXb0s6jrF2y20x/GLXlbQbdPrl0Rve5zK7uO1uHQ/i9K3k3Z3Xa34tHdj
XgKJfgDL2sX+1vm1Rlw6aOLesPfsv5L3WSnWVMia9c4+oWoox7UQ2sO2oq5bTVs0
ey2F2PGY7eiPHF/Bp2ffcRInH2X2XG+YNY3txl89x5hgeK5WylTnotabZzKaSYJt
/XOT7JdfmyBqxo9fpltX9JGuIEZh9Lck7a8+G+UlkFgwmMx9YRaYZtFDKw/u27BT
CMfUyhwvVXAC7Mgqi+D2Cvu7CBroe7vMp1Y0cnutDoAcFYPVe/143cyV+66EjX9d
PIkmcRGf99pj1yzJpZ8tZJr3O8iQXeDAVsf+imH+8KRWdbwjIZZXpGPlwr8Vts0G
mI0/3twjskrIkpTMPd2XtT9BqmbyrqvidMRLdDSq+sEEyEY1K3vqny0en3oYX8EX
8RC4MkJN/7rZjZyOsTWRA+J474qUJmK+m2MmLzp25Fr1ivcjd5XcQR77uGSpKjWR
qgTIsl1GY1zR1RLsoRKQ
=4hLW
-----END PGP SIGNATURE-----

Re: Exception when accessing partition, offset and timestamp in processor class

Posted by saiprasad mishra <sa...@gmail.com>.
Hi Matthias
Thanks for the reply.
I think I don't have permission for this.
If you can grant me permission I can create one (my handle is saimishra).
Or you can go ahead and create one

I may need permission to create JIRA as I might report more issues after
discussing with you over here.

Regards
Sai

On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hi,
>
> sorry for late reply. Seems like a bug to me; within
> Processor#process() accessing the context should work. Can you open a
> JIRA for it?
>
> - -Matthias
>
> On 10/23/16 10:28 PM, saiprasad mishra wrote:
> > Sorry for the email again
> >
> > I was expecting it to work always when accessed from process()
> > method as this corresponds to each kafka message/record processing.
> > I understand illegalstate by the time punctuate() is called as its
> > already batched by time interval
> >
> > Regards Sai
> >
> > On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra
> > <saiprasadmishra@gmail.com
> >> wrote:
> >
> >> Hi
> >>
> >> his is with my streaming app kafka 10.1.0.
> >>
> >> My flow looks something like below
> >>
> >> source topic stream -> filter for null value ->map to make it
> >> keyed by id ->custom processor to mystore -> to another topic ->
> >> ktable
> >>
> >> I am hitting the below type of exception in a custom processor
> >> class if I try to access offset() or partition() or timestamp()
> >> from the ProcessorContext in the process() method. I was hoping
> >> it would return the partition and offset for the enclosing
> >> topic(in this case source topic) where its consuming from or -1
> >> based on the api docs.
> >>
> >> Looks like only in certain cases it is accessible. is it getting
> >> lost in transformation phases.
> >>
> >> Same issue happens on if i try to access them in punctuate()
> >> method but some where I saw that it might not work in
> >> punctuate(). Any reason for this or any link describing this
> >> will be helpful
> >>
> >>
> >> ====================================================================
> >>
> >>
> >>
> >>
> java.lang.IllegalStateException: This should not happen as offset()
> >> should only be called while a record is processed at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at
> >> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.kstream.internals.KStreamMap$
> >> KStreamMapProcessor.process(KStreamMap.java:43)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.kstream.internals.KStreamFilter$
> >> KStreamFilterProcessor.process(KStreamFilter.java:44)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.
> >> SourceNode.process(SourceNode.java:66)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:181)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str
> eamThread.java:436)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:242)
> >> [kafka-streams-0.10.1.0.jar!/:?]
> >> =====================================================================
> >>
> >>
> >>
> >>
> >>
> Regards
> >> Sai
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYD5XWAAoJECnhiMLycopPUogQAJ6qawqVVmUORrGugiAC3/YM
> ge0bvBSLbwCbys1wkm8vi17iRcMcZgYV1kUbspCBa8Ax7sA7YgmeqEYJpuCt6rRG
> AXOepZ7WCF+q9NK8aLGTr94ymKMT4t5KlaBMmR9AMR0jAK8iGZJIYcwWHdzYQZz8
> DjY2lYmkkzAQSorx2s9v4AEU2LiCsug3jJY/3/uQYAQnEPmHG5IoOmHnQoWQqT8S
> udLAtbzCRTcA3Fua5UE1P8KCQG2Pjw8DuDE5qxi0DWVmiuB+ASzp2V7+yVxLVotw
> Okg2q1V0T9L0QorbwZ1nG6fys+OeOSIX3vg1KM8nUOCC2YbeGtueYqRte5ThE/Xp
> 5rVXNIHXGzpcO1BeZT8BdDHcFc/4AR6fHZy0XFv6gHDRn4nsemwGOiNRADjhNaNp
> cM9w2Bo8Wxo9qPz0fAnaYTTNt/J4h2RkycIcFTY2xvBVfmjJZwq9XVVwIXkIDnxN
> sTxM6Czy4L7bcP+y6B/tqOG96cIJ5czKZwD7qwEOM9D0KIns2iM2wuQSgqU/vweY
> bWiwqEkodg+X+CuJ/5nch5z6xw+6d2MNC/mkYik5pFL4Na4O7eNjoIclVlq7bgcu
> hAJMx1B4flAoGqcRjUCRq39/fzKDEp1cJ1G4FjM8wWdPagumKQcgH51GkVn+6+HU
> a36xOHjdT3P3j+wcu512
> =cUp7
> -----END PGP SIGNATURE-----
>

Re: Exception when accessing partition, offset and timestamp in processor class

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

sorry for late reply. Seems like a bug to me; within
Processor#process() accessing the context should work. Can you open a
JIRA for it?

- -Matthias

On 10/23/16 10:28 PM, saiprasad mishra wrote:
> Sorry for the email again
> 
> I was expecting it to work always when accessed from process() 
> method as this corresponds to each kafka message/record processing.
> I understand illegalstate by the time punctuate() is called as its
> already batched by time interval
> 
> Regards Sai
> 
> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra 
> <saiprasadmishra@gmail.com
>> wrote:
> 
>> Hi
>> 
>> his is with my streaming app kafka 10.1.0.
>> 
>> My flow looks something like below
>> 
>> source topic stream -> filter for null value ->map to make it 
>> keyed by id ->custom processor to mystore -> to another topic -> 
>> ktable
>> 
>> I am hitting the below type of exception in a custom processor 
>> class if I try to access offset() or partition() or timestamp() 
>> from the ProcessorContext in the process() method. I was hoping 
>> it would return the partition and offset for the enclosing 
>> topic(in this case source topic) where its consuming from or -1 
>> based on the api docs.
>> 
>> Looks like only in certain cases it is accessible. is it getting 
>> lost in transformation phases.
>> 
>> Same issue happens on if i try to access them in punctuate() 
>> method but some where I saw that it might not work in 
>> punctuate(). Any reason for this or any link describing this
>> will be helpful
>> 
>> 
>> ====================================================================
>>
>>
>>
>> 
java.lang.IllegalStateException: This should not happen as offset()
>> should only be called while a record is processed at 
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at 
>> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
ocessorNode.java:82)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.kstream.internals.KStreamMap$ 
>> KStreamMapProcessor.process(KStreamMap.java:43) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
ocessorNode.java:82)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.kstream.internals.KStreamFilter$ 
>> KStreamFilterProcessor.process(KStreamFilter.java:44) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
ocessorNode.java:82)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals. 
>> SourceNode.process(SourceNode.java:66) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals. 
>> StreamTask.process(StreamTask.java:181) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str
eamThread.java:436)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> StreamThread.run(StreamThread.java:242) 
>> [kafka-streams-0.10.1.0.jar!/:?] 
>> =====================================================================
>>
>>
>>
>>
>> 
Regards
>> Sai
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYD5XWAAoJECnhiMLycopPUogQAJ6qawqVVmUORrGugiAC3/YM
ge0bvBSLbwCbys1wkm8vi17iRcMcZgYV1kUbspCBa8Ax7sA7YgmeqEYJpuCt6rRG
AXOepZ7WCF+q9NK8aLGTr94ymKMT4t5KlaBMmR9AMR0jAK8iGZJIYcwWHdzYQZz8
DjY2lYmkkzAQSorx2s9v4AEU2LiCsug3jJY/3/uQYAQnEPmHG5IoOmHnQoWQqT8S
udLAtbzCRTcA3Fua5UE1P8KCQG2Pjw8DuDE5qxi0DWVmiuB+ASzp2V7+yVxLVotw
Okg2q1V0T9L0QorbwZ1nG6fys+OeOSIX3vg1KM8nUOCC2YbeGtueYqRte5ThE/Xp
5rVXNIHXGzpcO1BeZT8BdDHcFc/4AR6fHZy0XFv6gHDRn4nsemwGOiNRADjhNaNp
cM9w2Bo8Wxo9qPz0fAnaYTTNt/J4h2RkycIcFTY2xvBVfmjJZwq9XVVwIXkIDnxN
sTxM6Czy4L7bcP+y6B/tqOG96cIJ5czKZwD7qwEOM9D0KIns2iM2wuQSgqU/vweY
bWiwqEkodg+X+CuJ/5nch5z6xw+6d2MNC/mkYik5pFL4Na4O7eNjoIclVlq7bgcu
hAJMx1B4flAoGqcRjUCRq39/fzKDEp1cJ1G4FjM8wWdPagumKQcgH51GkVn+6+HU
a36xOHjdT3P3j+wcu512
=cUp7
-----END PGP SIGNATURE-----

Re: Exception when accessing partition, offset and timestamp in processor class

Posted by saiprasad mishra <sa...@gmail.com>.
Sorry for the email again

I was expecting it to work always when accessed from process() method as
this corresponds to each kafka message/record processing.
I understand illegalstate by the time punctuate() is called as its already
batched by time interval

Regards
Sai

On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra <saiprasadmishra@gmail.com
> wrote:

> Hi
>
> his is with my streaming app kafka 10.1.0.
>
> My flow looks something like below
>
> source topic stream -> filter for null value ->map to make it keyed by id
> ->custom processor to mystore -> to another topic -> ktable
>
> I am hitting the below type of exception in a custom processor class if I
> try to access offset() or partition() or timestamp() from the
> ProcessorContext in the process() method. I was hoping it would return the
> partition and offset for the enclosing topic(in this case source topic)
> where its consuming from or -1 based on the api docs.
>
> Looks like only in certain cases it is accessible. is it getting lost in
> transformation phases.
>
> Same issue happens on if i try to access them in punctuate() method but
> some where I saw that it might not work in punctuate(). Any reason for this
> or any link describing this will be helpful
>
>
> ====================================================================
>
> java.lang.IllegalStateException: This should not happen as offset()
> should only be called while a record is processed
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.kstream.internals.KStreamMap$
> KStreamMapProcessor.process(KStreamMap.java:43)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?]
> =====================================================================
>
>
> Regards
> Sai
>