You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2019/01/02 14:31:41 UTC
Re: Why do I get an IllegalStateException when accessing record
metadata?
Thanks for reporting this. Feel free to edit the Wiki with the FAQ directly.
What is unclear to me: what do you mean by "the state store [...] was
errantly scoped to the TransformerProvider, not the Transformer" ?
I would like to understand the actual issue.
-Matthias
On 12/31/18 2:36 AM, Eric Lalonde wrote:
> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
> application was exhibiting all of the behavior as discussed in the FAQ
> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the section:
>
> Why do I get an IllegalStateException when accessing record metadata?
>
> However, the root cause of my problem was not a due to the lack of
> instantiating a new transformer for each task. The root cause of my
> mistake was a bit more insidious: the state store that I instantiated
> was errantly scoped to the TransformerProvider, not the Transformer. Of
> course when I finally realized my error, the problem was obvious, but it
> was not immediately obvious. May I suggest extending the FAQ to help
> others as well? Perhaps it would be helpful to extend the aforementioned
> FAQ section in the following way (highlighted text is my addition):
>
>
> Why do I get an IllegalStateException when accessing record metadata?
>
> If you attach a new |Processor/Transformer/ValueTransformer| to your
> topology using a corresponding supplier, you need to make sure that
> the supplier returns a /new/ instance each time |get()| is called.
> If you return the same object, a
> single |Processor/Transformer/ValueTransformer| would be shared over
> multiple tasks resulting in an |IllegalStateException| with error
> message |"This should not happen as topic() should only be called
> while a record is processed"| (depending on the method you are
> calling it could also be |partition()|, |offset()|,
> or |timestamp()| instead of |topic()|). Additionally, all
> instantiated state stores must be scoped to the inner
> Processor/Transformer/ValueTransformer class, and not to the parent
> Provider class. Scoping state stores to the parent class will result
> in state store re-use across tasks, which will also result
> in IllegalStateExceptions.
>
>
> Hope this saves someone else from making the same mistake :)
>
> - Eric
Re: Why do I get an IllegalStateException when accessing record
metadata?
Posted by Peter Levart <pe...@gmail.com>.
Hi,
I suggest the following:
If you attach a new Processor/Transformer/ValueTransformer to your
topology using a corresponding supplier, you need to make sure that the
supplier returns a new instance each time get() is called. If you return
the same object, a single Processor/Transformer/ValueTransformer would
be shared over multiple tasks resulting in an IllegalStateException with
error message "This should not happen as topic() should only be called
while a record is processed" (depending on the method you are calling it
could also be partition(), offset(), or timestamp() instead of topic()).
...is appended with:
Additionally, all state stores to be used in a particular
Processor/Transformer instance's process/transform callbacks must be
obtained from ProcessorContext in the same Processor/Transformer
instance (typically in its init() method).
The same holds for scheduled Punctuator callbacks if they use any state
stores - they should only use the state stores obtained from the same
Processor/Transformer instance's ProcessorContext as they were scheduled
from.
I'm sure about the 1st statement, but intuitively I think the 2nd should
also hold. The Processor/Transformer API is structured in a way that
enforces such usage, but it can't prevent invalid usages, as for example
Eric's.
What Eric did wrong was to assign a reference to state store obtained in
Transformer.init() to a shared field outside the Transformer instance.
This is wrong in several ways, among others also:
- the field's value got overwritten each time new Transformer instance
got its init() method invoked, keeping just the last value assigned
- the field's value might have been overwritten from multiple threads
while being accessed from multiple threads which could lead to data races
(here I'm not considering any advice given by the additional appended
statements above)
So my advice would be to keep an eye on correct programming in general
and let the API lead you.
Regards, Peter
On 1/3/19 9:50 PM, Matthias J. Sax wrote:
> I see.
>
> When updating the FAQ, it should be clear what you mean. Your current
> proposal was unclear to me, and thus, it might be unclear to other
> users, too.
>
>
> -Matthias
>
> On 1/3/19 9:13 PM, Eric Lalonde wrote:
>>
>>> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <matthias@confluent.io
>>> <ma...@confluent.io>> wrote:
>>>
>>> Thanks for reporting this. Feel free to edit the Wiki with the FAQ
>>> directly.
>>>
>>> What is unclear to me: what do you mean by "the state store [...] was
>>> errantly scoped to the TransformerProvider, not the Transformer" ?
>>>
>>> I would like to understand the actual issue.
>> See this
>> gist: https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4
>>
>> Because MyStore is declared in the parent supplier, it will be shared
>> across tasks, even though the .get() function is instantiating a new
>> MyTransformer() for each task. It should have been declared in the
>> MyTransformer sub-class (say, around, line 15).
>>
>>
>>> -Matthias
>>>
>>> On 12/31/18 2:36 AM, Eric Lalonde wrote:
>>>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
>>>> application was exhibiting all of the behavior as discussed in the FAQ
>>>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the
>>>> section:
>>>>
>>>> Why do I get an IllegalStateException when accessing record metadata?
>>>>
>>>> However, the root cause of my problem was not a due to the lack of
>>>> instantiating a new transformer for each task. The root cause of my
>>>> mistake was a bit more insidious: the state store that I instantiated
>>>> was errantly scoped to the TransformerProvider, not the Transformer. Of
>>>> course when I finally realized my error, the problem was obvious, but it
>>>> was not immediately obvious. May I suggest extending the FAQ to help
>>>> others as well? Perhaps it would be helpful to extend the aforementioned
>>>> FAQ section in the following way (highlighted text is my addition):
>>>>
>>>>
>>>> Why do I get an IllegalStateException when accessing record metadata?
>>>>
>>>> If you attach a new |Processor/Transformer/ValueTransformer| to your
>>>> topology using a corresponding supplier, you need to make sure that
>>>> the supplier returns a /new/ instance each time |get()| is called.
>>>> If you return the same object, a
>>>> single |Processor/Transformer/ValueTransformer| would be shared over
>>>> multiple tasks resulting in an |IllegalStateException| with error
>>>> message |"This should not happen as topic() should only be called
>>>> while a record is processed"| (depending on the method you are
>>>> calling it could also be |partition()|, |offset()|,
>>>> or |timestamp()| instead of |topic()|). Additionally, all
>>>> instantiated state stores must be scoped to the inner
>>>> Processor/Transformer/ValueTransformer class, and not to the parent
>>>> Provider class. Scoping state stores to the parent class will result
>>>> in state store re-use across tasks, which will also result
>>>> in IllegalStateExceptions.
>>>>
>>>>
>>>> Hope this saves someone else from making the same mistake :)
>>>>
>>>> - Eric
Re: Why do I get an IllegalStateException when accessing record
metadata?
Posted by "Matthias J. Sax" <ma...@confluent.io>.
I see.
When updating the FAQ, it should be clear what you mean. Your current
proposal was unclear to me, and thus, it might be unclear to other
users, too.
-Matthias
On 1/3/19 9:13 PM, Eric Lalonde wrote:
>
>
>> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <matthias@confluent.io
>> <ma...@confluent.io>> wrote:
>>
>> Thanks for reporting this. Feel free to edit the Wiki with the FAQ
>> directly.
>>
>> What is unclear to me: what do you mean by "the state store [...] was
>> errantly scoped to the TransformerProvider, not the Transformer" ?
>>
>> I would like to understand the actual issue.
>
> See this
> gist: https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4
>
> Because MyStore is declared in the parent supplier, it will be shared
> across tasks, even though the .get() function is instantiating a new
> MyTransformer() for each task. It should have been declared in the
> MyTransformer sub-class (say, around, line 15).
>
>
>> -Matthias
>>
>> On 12/31/18 2:36 AM, Eric Lalonde wrote:
>>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
>>> application was exhibiting all of the behavior as discussed in the FAQ
>>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the
>>> section:
>>>
>>> Why do I get an IllegalStateException when accessing record metadata?
>>>
>>> However, the root cause of my problem was not a due to the lack of
>>> instantiating a new transformer for each task. The root cause of my
>>> mistake was a bit more insidious: the state store that I instantiated
>>> was errantly scoped to the TransformerProvider, not the Transformer. Of
>>> course when I finally realized my error, the problem was obvious, but it
>>> was not immediately obvious. May I suggest extending the FAQ to help
>>> others as well? Perhaps it would be helpful to extend the aforementioned
>>> FAQ section in the following way (highlighted text is my addition):
>>>
>>>
>>> Why do I get an IllegalStateException when accessing record metadata?
>>>
>>> If you attach a new |Processor/Transformer/ValueTransformer| to your
>>> topology using a corresponding supplier, you need to make sure that
>>> the supplier returns a /new/ instance each time |get()| is called.
>>> If you return the same object, a
>>> single |Processor/Transformer/ValueTransformer| would be shared over
>>> multiple tasks resulting in an |IllegalStateException| with error
>>> message |"This should not happen as topic() should only be called
>>> while a record is processed"| (depending on the method you are
>>> calling it could also be |partition()|, |offset()|,
>>> or |timestamp()| instead of |topic()|). Additionally, all
>>> instantiated state stores must be scoped to the inner
>>> Processor/Transformer/ValueTransformer class, and not to the parent
>>> Provider class. Scoping state stores to the parent class will result
>>> in state store re-use across tasks, which will also result
>>> in IllegalStateExceptions.
>>>
>>>
>>> Hope this saves someone else from making the same mistake :)
>>>
>>> - Eric
>>
>
Re: Why do I get an IllegalStateException when accessing record
metadata?
Posted by Eric Lalonde <er...@autonomic.ai>.
> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <ma...@confluent.io> wrote:
>
> Thanks for reporting this. Feel free to edit the Wiki with the FAQ directly.
>
> What is unclear to me: what do you mean by "the state store [...] was
> errantly scoped to the TransformerProvider, not the Transformer" ?
>
> I would like to understand the actual issue.
See this gist: https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4 <https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4>
Because MyStore is declared in the parent supplier, it will be shared across tasks, even though the .get() function is instantiating a new MyTransformer() for each task. It should have been declared in the MyTransformer sub-class (say, around, line 15).
> -Matthias
>
> On 12/31/18 2:36 AM, Eric Lalonde wrote:
>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
>> application was exhibiting all of the behavior as discussed in the FAQ
>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the section:
>>
>> Why do I get an IllegalStateException when accessing record metadata?
>>
>> However, the root cause of my problem was not a due to the lack of
>> instantiating a new transformer for each task. The root cause of my
>> mistake was a bit more insidious: the state store that I instantiated
>> was errantly scoped to the TransformerProvider, not the Transformer. Of
>> course when I finally realized my error, the problem was obvious, but it
>> was not immediately obvious. May I suggest extending the FAQ to help
>> others as well? Perhaps it would be helpful to extend the aforementioned
>> FAQ section in the following way (highlighted text is my addition):
>>
>>
>> Why do I get an IllegalStateException when accessing record metadata?
>>
>> If you attach a new |Processor/Transformer/ValueTransformer| to your
>> topology using a corresponding supplier, you need to make sure that
>> the supplier returns a /new/ instance each time |get()| is called.
>> If you return the same object, a
>> single |Processor/Transformer/ValueTransformer| would be shared over
>> multiple tasks resulting in an |IllegalStateException| with error
>> message |"This should not happen as topic() should only be called
>> while a record is processed"| (depending on the method you are
>> calling it could also be |partition()|, |offset()|,
>> or |timestamp()| instead of |topic()|). Additionally, all
>> instantiated state stores must be scoped to the inner
>> Processor/Transformer/ValueTransformer class, and not to the parent
>> Provider class. Scoping state stores to the parent class will result
>> in state store re-use across tasks, which will also result
>> in IllegalStateExceptions.
>>
>>
>> Hope this saves someone else from making the same mistake :)
>>
>> - Eric
>