You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2015/02/16 21:16:13 UTC

[jira] [Commented] (SAMZA-567) Can't interact with KV store from InitiableTask.init()

    [ https://issues.apache.org/jira/browse/SAMZA-567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14323232#comment-14323232 ] 

Chris Riccomini commented on SAMZA-567:
---------------------------------------

I took a quick look at this. I think we should just move the ordering of SamzaContainer to be:

{noformat}
      startMetrics
      startOffsetManager
      startStores
      startProducers
      startConsumers
      startTask
{noformat}

Similarly, we should move the shutdown to be:

{noformat}
      shutdownConsumers
      shutdownProducers
      shutdownTask
      shutdownStores
      shutdownOffsetManager
      shutdownMetrics
{noformat}

I think this should be safe to do. This will allow us to treat the init() method just like a process method, which makes a lot of sense. It's essentially just the first invocation to process, but doesn't include a message.

> Can't interact with KV store from InitiableTask.init()
> ------------------------------------------------------
>
>                 Key: SAMZA-567
>                 URL: https://issues.apache.org/jira/browse/SAMZA-567
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Tommy Becker
>             Fix For: 0.9.0
>
>
> Attempting to interact with the KeyValueStore from InitiableTask.init() results in a rather obscure exception:
> java.util.NoSuchElementException: key not found: TaskName-Partition 3
> 	at scala.collection.MapLike$class.default(MapLike.scala:228) ~[scala-library-2.10.1.jar:na]
> 	at scala.collection.AbstractMap.default(Map.scala:58) ~[scala-library-2.10.1.jar:na]
> 	at scala.collection.mutable.HashMap.apply(HashMap.scala:64) ~[scala-library-2.10.1.jar:na]
> 	at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71) ~[samza-core_2.10-0.8.0.jar:na]
> 	at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) ~[samza-core_2.10-0.8.0.jar:na]
> 	at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) ~[samza-kv_2.10-0.8.0.jar:na]
> 	at org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57) ~[samza-kv_2.10-0.8.0.jar:na]
> 	at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159) ~[samza-kv_2.10-0.8.0.jar:na]
> 	at org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69) ~[samza-kv_2.10-0.8.0.jar:na]
> 	at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) ~[na:1.8.0_25]
> 	at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
> 	at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
> 	at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91) ~[samza-kv_2.10-0.8.0.jar:na]
> 	at org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36) ~[samza-kv_2.10-0.8.0.jar:na]
> 	at org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44) ~[samza-kv_2.10-0.8.0.jar:na]
> ...
> After some investigation I see that it's actually not safe to do anything that is going to potentially produce messages from init(),  because startTask is called before startProducers in SamzaContainer.run.  Interaction with the KV store results in writes to the changelog, resulting in the above exception.  Conceptually, it seems like the producers should be initialized first to prevent this, but I have no idea what the side-effects of doing that would be.  Minimally, I'd like to see this behavior documented and a more obvious failure such as an IllegalStateException.
> Discussion that precipitated this issue:
> http://mail-archives.apache.org/mod_mbox/samza-dev/201502.mbox/%3c962D3CAB94174A4E9B771B88A9DFE7B10F1E15E2@SJEXMB02.Tivo.com%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)