You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Eric Lalonde <er...@autonomic.ai> on 2019/01/03 20:13:07 UTC

Re: Why do I get an IllegalStateException when accessing record metadata?


> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <ma...@confluent.io> wrote:
> 
> Thanks for reporting this. Feel free to edit the Wiki with the FAQ directly.
> 
> What is unclear to me: what do you mean by "the state store [...] was
> errantly scoped to the TransformerProvider, not the Transformer" ?
> 
> I would like to understand the actual issue.

See this gist: https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4 <https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4>

Because MyStore is declared in the parent supplier, it will be shared across tasks, even though the .get() function is instantiating a new MyTransformer() for each task. It should have been declared in the MyTransformer sub-class (say, around, line 15).


> -Matthias
> 
> On 12/31/18 2:36 AM, Eric Lalonde wrote:
>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
>> application was exhibiting all of the behavior as discussed in the FAQ
>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the section:
>> 
>> Why do I get an IllegalStateException when accessing record metadata? 
>> 
>> However, the root cause of my problem was not a due to the lack of
>> instantiating a new transformer for each task. The root cause of my
>> mistake was a bit more insidious: the state store that I instantiated
>> was errantly scoped to the TransformerProvider, not the Transformer. Of
>> course when I finally realized my error, the problem was obvious, but it
>> was not immediately obvious. May I suggest extending the FAQ to help
>> others as well? Perhaps it would be helpful to extend the aforementioned
>> FAQ section in the following way (highlighted text is my addition):
>> 
>> 
>>    Why do I get an IllegalStateException when accessing record metadata? 
>> 
>>    If you attach a new |Processor/Transformer/ValueTransformer| to your
>>    topology using a corresponding supplier, you need to make sure that
>>    the supplier returns a /new/ instance each time |get()| is called.
>>    If you return the same object, a
>>    single |Processor/Transformer/ValueTransformer| would be shared over
>>    multiple tasks resulting in an |IllegalStateException| with error
>>    message |"This should not happen as topic() should only be called
>>    while a record is processed"| (depending on the method you are
>>    calling it could also be |partition()|, |offset()|,
>>    or |timestamp()| instead of |topic()|). Additionally, all
>>    instantiated state stores must be scoped to the inner
>>    Processor/Transformer/ValueTransformer class, and not to the parent
>>    Provider class. Scoping state stores to the parent class will result
>>    in state store re-use across tasks, which will also result
>>    in IllegalStateExceptions.
>> 
>> 
>> Hope this saves someone else from making the same mistake :)
>> 
>> - Eric
> 


Re: Why do I get an IllegalStateException when accessing record metadata?

Posted by Peter Levart <pe...@gmail.com>.
Hi,

I suggest the following:

If you attach a new Processor/Transformer/ValueTransformer to your 
topology using a corresponding supplier, you need to make sure that the 
supplier returns a new instance each time get() is called. If you return 
the same object, a single Processor/Transformer/ValueTransformer would 
be shared over multiple tasks resulting in an IllegalStateException with 
error message "This should not happen as topic() should only be called 
while a record is processed" (depending on the method you are calling it 
could also be partition(), offset(), or timestamp() instead of topic()).

...is appended with:

Additionally, all state stores to be used in a particular 
Processor/Transformer instance's process/transform callbacks must be 
obtained from ProcessorContext in the same Processor/Transformer 
instance (typically in its init() method).

The same holds for scheduled Punctuator callbacks if they use any state 
stores - they should only use the state stores obtained from the same 
Processor/Transformer instance's ProcessorContext as they were scheduled 
from.


I'm sure about the 1st statement, but intuitively I think the 2nd should 
also hold. The Processor/Transformer API is structured in a way that 
enforces such usage, but it can't prevent invalid usages, as for example 
Eric's.

What Eric did wrong was to assign a reference to state store obtained in 
Transformer.init() to a shared field outside the Transformer instance. 
This is wrong in several ways, among others also:
- the field's value got overwritten each time new Transformer instance 
got its init() method invoked, keeping just the last value assigned
- the field's value might have been overwritten from multiple threads 
while being accessed from multiple threads which could lead to data races
(here I'm not considering any advice given by the additional appended 
statements above)

So my advice would be to keep an eye on correct programming in general 
and let the API lead you.

Regards, Peter

On 1/3/19 9:50 PM, Matthias J. Sax wrote:
> I see.
>
> When updating the FAQ, it should be clear what you mean. Your current
> proposal was unclear to me, and thus, it might be unclear to other
> users, too.
>
>
> -Matthias
>
> On 1/3/19 9:13 PM, Eric Lalonde wrote:
>>
>>> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <matthias@confluent.io
>>> <ma...@confluent.io>> wrote:
>>>
>>> Thanks for reporting this. Feel free to edit the Wiki with the FAQ
>>> directly.
>>>
>>> What is unclear to me: what do you mean by "the state store [...] was
>>> errantly scoped to the TransformerProvider, not the Transformer" ?
>>>
>>> I would like to understand the actual issue.
>> See this
>> gist: https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4
>>
>> Because MyStore is declared in the parent supplier, it will be shared
>> across tasks, even though the .get() function is instantiating a new
>> MyTransformer() for each task. It should have been declared in the
>> MyTransformer sub-class (say, around, line 15).
>>
>>
>>> -Matthias
>>>
>>> On 12/31/18 2:36 AM, Eric Lalonde wrote:
>>>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
>>>> application was exhibiting all of the behavior as discussed in the FAQ
>>>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the
>>>> section:
>>>>
>>>> Why do I get an IllegalStateException when accessing record metadata?
>>>>
>>>> However, the root cause of my problem was not a due to the lack of
>>>> instantiating a new transformer for each task. The root cause of my
>>>> mistake was a bit more insidious: the state store that I instantiated
>>>> was errantly scoped to the TransformerProvider, not the Transformer. Of
>>>> course when I finally realized my error, the problem was obvious, but it
>>>> was not immediately obvious. May I suggest extending the FAQ to help
>>>> others as well? Perhaps it would be helpful to extend the aforementioned
>>>> FAQ section in the following way (highlighted text is my addition):
>>>>
>>>>
>>>>     Why do I get an IllegalStateException when accessing record metadata?
>>>>
>>>>     If you attach a new |Processor/Transformer/ValueTransformer| to your
>>>>     topology using a corresponding supplier, you need to make sure that
>>>>     the supplier returns a /new/ instance each time |get()| is called.
>>>>     If you return the same object, a
>>>>     single |Processor/Transformer/ValueTransformer| would be shared over
>>>>     multiple tasks resulting in an |IllegalStateException| with error
>>>>     message |"This should not happen as topic() should only be called
>>>>     while a record is processed"| (depending on the method you are
>>>>     calling it could also be |partition()|, |offset()|,
>>>>     or |timestamp()| instead of |topic()|). Additionally, all
>>>>     instantiated state stores must be scoped to the inner
>>>>     Processor/Transformer/ValueTransformer class, and not to the parent
>>>>     Provider class. Scoping state stores to the parent class will result
>>>>     in state store re-use across tasks, which will also result
>>>>     in IllegalStateExceptions.
>>>>
>>>>
>>>> Hope this saves someone else from making the same mistake :)
>>>>
>>>> - Eric


Re: Why do I get an IllegalStateException when accessing record metadata?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I see.

When updating the FAQ, it should be clear what you mean. Your current
proposal was unclear to me, and thus, it might be unclear to other
users, too.


-Matthias

On 1/3/19 9:13 PM, Eric Lalonde wrote:
> 
> 
>> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <matthias@confluent.io
>> <ma...@confluent.io>> wrote:
>>
>> Thanks for reporting this. Feel free to edit the Wiki with the FAQ
>> directly.
>>
>> What is unclear to me: what do you mean by "the state store [...] was
>> errantly scoped to the TransformerProvider, not the Transformer" ?
>>
>> I would like to understand the actual issue.
> 
> See this
> gist: https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4
> 
> Because MyStore is declared in the parent supplier, it will be shared
> across tasks, even though the .get() function is instantiating a new
> MyTransformer() for each task. It should have been declared in the
> MyTransformer sub-class (say, around, line 15).
> 
> 
>> -Matthias
>>
>> On 12/31/18 2:36 AM, Eric Lalonde wrote:
>>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
>>> application was exhibiting all of the behavior as discussed in the FAQ
>>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the
>>> section:
>>>
>>> Why do I get an IllegalStateException when accessing record metadata? 
>>>
>>> However, the root cause of my problem was not a due to the lack of
>>> instantiating a new transformer for each task. The root cause of my
>>> mistake was a bit more insidious: the state store that I instantiated
>>> was errantly scoped to the TransformerProvider, not the Transformer. Of
>>> course when I finally realized my error, the problem was obvious, but it
>>> was not immediately obvious. May I suggest extending the FAQ to help
>>> others as well? Perhaps it would be helpful to extend the aforementioned
>>> FAQ section in the following way (highlighted text is my addition):
>>>
>>>
>>>    Why do I get an IllegalStateException when accessing record metadata? 
>>>
>>>    If you attach a new |Processor/Transformer/ValueTransformer| to your
>>>    topology using a corresponding supplier, you need to make sure that
>>>    the supplier returns a /new/ instance each time |get()| is called.
>>>    If you return the same object, a
>>>    single |Processor/Transformer/ValueTransformer| would be shared over
>>>    multiple tasks resulting in an |IllegalStateException| with error
>>>    message |"This should not happen as topic() should only be called
>>>    while a record is processed"| (depending on the method you are
>>>    calling it could also be |partition()|, |offset()|,
>>>    or |timestamp()| instead of |topic()|). Additionally, all
>>>    instantiated state stores must be scoped to the inner
>>>    Processor/Transformer/ValueTransformer class, and not to the parent
>>>    Provider class. Scoping state stores to the parent class will result
>>>    in state store re-use across tasks, which will also result
>>>    in IllegalStateExceptions.
>>>
>>>
>>> Hope this saves someone else from making the same mistake :)
>>>
>>> - Eric
>>
>