You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Tommy Becker <to...@Tivo.com> on 2015/02/16 20:17:53 UTC

Not safe to access KV stores from InitiableTask.init()

I have need to do some initial processing of the entries in my KV store on startup before processing messages.  I put the code into my task's init() method, and although it worked with an empty KV store/changelog once I have entries in there it bombs with a rather obscure exception:

java.util.NoSuchElementException: key not found: TaskName-Partition 3
at scala.collection.MapLike$class.default(MapLike.scala:228) ~[scala-library-2.10.1.jar:na]
at scala.collection.AbstractMap.default(Map.scala:58) ~[scala-library-2.10.1.jar:na]
at scala.collection.mutable.HashMap.apply(HashMap.scala:64) ~[scala-library-2.10.1.jar:na]
at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71) ~[samza-core_2.10-0.8.0.jar:na]
at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) ~[samza-core_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57) ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159) ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69) ~[samza-kv_2.10-0.8.0.jar:na]
at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) ~[na:1.8.0_25]
at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91) ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36) ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44) ~[samza-kv_2.10-0.8.0.jar:na]
...

After some investigation I see that it's actually not safe to interact with anything that is going to potentially produce messages from init(), since startTask is called before startProducers in SamzaContainer.run.  In retrospect I guess that is why a MessageCollector is not passed to init() but of course writes to the KV store result in messages being sent to the changelog :/  I guess my question is whether or not this is intended behavior (could we not simply initialize producers before tasks) and if so, what an alternative might be for my use case.  As it is currently it seems like all I can do is add an "initProcessingDone" flag to my task and check it every time a message comes in.

________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

RE: Not safe to access KV stores from InitiableTask.init()

Posted by Tommy Becker <to...@Tivo.com>.
Thanks Chris.

https://issues.apache.org/jira/browse/SAMZA-567

I'm working remotely today and probably tomorrow, so you'll probably beat me to it ;)
________________________________________
From: Chris Riccomini [criccomini@apache.org]
Sent: Monday, February 16, 2015 2:39 PM
To: dev@samza.apache.org
Subject: Re: Not safe to access KV stores from InitiableTask.init()

Hey Tommy,

This sounds broken. Let me have a look and see if there's an easy fix. I
*think* reordering should work, but I just want to make sure.

Could you open a JIRA and set the fixed version to 0.9.0? I'll take a looks
today/tomorrow. If you want to test out reordering it, please share any
findings. :)

Cheers,
Chris

On Monday, February 16, 2015, Tommy Becker <to...@tivo.com> wrote:

> I have need to do some initial processing of the entries in my KV store on
> startup before processing messages.  I put the code into my task's init()
> method, and although it worked with an empty KV store/changelog once I have
> entries in there it bombs with a rather obscure exception:
>
> java.util.NoSuchElementException: key not found: TaskName-Partition 3
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> ~[scala-library-2.10.1.jar:na]
> at scala.collection.AbstractMap.default(Map.scala:58)
> ~[scala-library-2.10.1.jar:na]
> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> ~[scala-library-2.10.1.jar:na]
> at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71)
> ~[samza-core_2.10-0.8.0.jar:na]
> at
> org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
> ~[samza-core_2.10-0.8.0.jar:na]
> at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
> ~[na:1.8.0_25]
> at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
> at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
> at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
> ~[samza-kv_2.10-0.8.0.jar:na]
> ...
>
> After some investigation I see that it's actually not safe to interact
> with anything that is going to potentially produce messages from init(),
> since startTask is called before startProducers in SamzaContainer.run.  In
> retrospect I guess that is why a MessageCollector is not passed to init()
> but of course writes to the KV store result in messages being sent to the
> changelog :/  I guess my question is whether or not this is intended
> behavior (could we not simply initialize producers before tasks) and if so,
> what an alternative might be for my use case.  As it is currently it seems
> like all I can do is add an "initProcessingDone" flag to my task and check
> it every time a message comes in.
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>

________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Re: Not safe to access KV stores from InitiableTask.init()

Posted by Chris Riccomini <cr...@apache.org>.
Hey Tommy,

This sounds broken. Let me have a look and see if there's an easy fix. I
*think* reordering should work, but I just want to make sure.

Could you open a JIRA and set the fixed version to 0.9.0? I'll take a looks
today/tomorrow. If you want to test out reordering it, please share any
findings. :)

Cheers,
Chris

On Monday, February 16, 2015, Tommy Becker <to...@tivo.com> wrote:

> I have need to do some initial processing of the entries in my KV store on
> startup before processing messages.  I put the code into my task's init()
> method, and although it worked with an empty KV store/changelog once I have
> entries in there it bombs with a rather obscure exception:
>
> java.util.NoSuchElementException: key not found: TaskName-Partition 3
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> ~[scala-library-2.10.1.jar:na]
> at scala.collection.AbstractMap.default(Map.scala:58)
> ~[scala-library-2.10.1.jar:na]
> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> ~[scala-library-2.10.1.jar:na]
> at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71)
> ~[samza-core_2.10-0.8.0.jar:na]
> at
> org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
> ~[samza-core_2.10-0.8.0.jar:na]
> at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
> ~[na:1.8.0_25]
> at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
> at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
> at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
> ~[samza-kv_2.10-0.8.0.jar:na]
> ...
>
> After some investigation I see that it's actually not safe to interact
> with anything that is going to potentially produce messages from init(),
> since startTask is called before startProducers in SamzaContainer.run.  In
> retrospect I guess that is why a MessageCollector is not passed to init()
> but of course writes to the KV store result in messages being sent to the
> changelog :/  I guess my question is whether or not this is intended
> behavior (could we not simply initialize producers before tasks) and if so,
> what an alternative might be for my use case.  As it is currently it seems
> like all I can do is add an "initProcessingDone" flag to my task and check
> it every time a message comes in.
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>