You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/03/06 00:46:47 UTC

[jira] [Commented] (SAMZA-169) Task initialization fails if changelog stream does not already exist

    [ https://issues.apache.org/jira/browse/SAMZA-169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13921645#comment-13921645 ] 

Chris Riccomini commented on SAMZA-169:
---------------------------------------

Could we combine both getSystemStreamMetadata, and just have one with second parameter retryBackoff= new ExponentialSleepStrategy(initialDelayMs = 500) as the default?

Rest of the patch looks good to me.

> Task initialization fails if changelog stream does not already exist
> --------------------------------------------------------------------
>
>                 Key: SAMZA-169
>                 URL: https://issues.apache.org/jira/browse/SAMZA-169
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka, kv
>    Affects Versions: 0.6.0
>            Reporter: Martin Kleppmann
>            Assignee: Martin Kleppmann
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-169.patch
>
>
> The first time you run a job that uses state with changelog (such as in SAMZA-152), you get the following exception:
> {noformat}
> 2014-03-04 18:15:20 SamzaContainer [ERROR] Caught exception in process loop.
> org.apache.samza.SamzaException: Missing a change log offset for SystemStreamPartition [partition=Partition [partition=1], system=kafka, stream=wikipedia-stats-changelog].
> 	at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:84)
> 	at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:84)
> 	at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> 	at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> 	at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> 	at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:81)
> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> 	at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> 	at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> 	at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:81)
> 	at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:60)
> 	at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:103)
> 	at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:579)
> 	at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:579)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> 	at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:579)
> 	at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504)
> 	at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81)
> 	at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> If debug logging is turned on, the following telltale line appears in the log earlier:
> {noformat}
> 2014-03-04 18:15:20 KafkaSystemAdmin [DEBUG] Got metadata for streams: Map(wikipedia-stats-changelog -> {TopicMetadata for topic wikipedia-stats-changelog -> 
> No partition metadata for topic wikipedia-stats-changelog due to kafka.common.LeaderNotAvailableException})
> {noformat}
> Full log: https://gist.github.com/ept/1fecad1b2d79797990a8



--
This message was sent by Atlassian JIRA
(v6.2#6252)