You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yan Fang (JIRA)" <ji...@apache.org> on 2014/04/25 23:33:15 UTC

[jira] [Commented] (SAMZA-151) Fail early when a consumer is misconfigured

    [ https://issues.apache.org/jira/browse/SAMZA-151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13981642#comment-13981642 ] 

Yan Fang commented on SAMZA-151:
--------------------------------

Hi Jakob,

Thank you for reviewing! Updated as your comments: https://reviews.apache.org/r/20697/

After double checking, I figured it out: the exception DOES fail the samza container. I was expecting it would fail AM as well... Then I found out the AM would be killed after a few container failures. So that means this code works.

Thank you.


> Fail early when a consumer is misconfigured
> -------------------------------------------
>
>                 Key: SAMZA-151
>                 URL: https://issues.apache.org/jira/browse/SAMZA-151
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Yan Fang
>              Labels: newbie
>         Attachments: SAMZA-151.1.patch, SAMZA-151.patch
>
>
> Currently, if a SystemFactory fails to create a SystemConsumer and throws an exception, we log this error with:
> {code}
>             info("Failed to create a consumer for %s, so skipping." format systemName)
> {code}
> SamzaContainer then continues on. If the system with the failed consumer is configured to be used for an input stream or changelog, the lack of SystemConsumer for the system will ABSOLUTELY result an exception later.
> For example, if you have a system called "kafka", and task.inputs is set to "kafka.my-topic", but the KafkaSystemFactory fails to return a consumer, we will log the error and continue. The resulting exception that's thrown much later is:
> {noformat}
> java.util.NoSuchElementException: key not found: kafka
>         at scala.collection.MapLike$class.default(MapLike.scala:223)
>         at scala.collection.immutable.Map$EmptyMap$.default(Map.scala:73)
>         at scala.collection.MapLike$class.apply(MapLike.scala:134)
>         at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:73)
>         at org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:175)
>         at org.apache.samza.container.TaskInstance$$anonfun$registerConsumers$6.apply(TaskInstance.scala:156)
>         at org.apache.samza.container.TaskInstance$$anonfun$registerConsumers$6.apply(TaskInstance.scala:155)
>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
>         at org.apache.samza.container.TaskInstance.registerConsumers(TaskInstance.scala:155)
>         at org.apache.samza.container.SamzaContainer$$anonfun$startConsumers$2.apply(SamzaContainer.scala:544)
>         at org.apache.samza.container.SamzaContainer$$anonfun$startConsumers$2.apply(SamzaContainer.scala:544)
>         at scala.collection.MapLike$DefaultValuesIterable$$anonfun$foreach$4.apply(MapLike.scala:201)
>         at scala.collection.MapLike$DefaultValuesIterable$$anonfun$foreach$4.apply(MapLike.scala:201)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:201)
>         at org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.scala:544)
>         at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:450)
>         at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:78)
>         at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> This is a pretty bad experience. We should proactively check that the system that has a failed SystemConsumer is not used in either task.inputs or changelogs, and fail outright with better logs.
> On the producer-side, we can't do this because tasks can produce to any system they want (since it's a string). We simply fail when they produce to a non-existent system.



--
This message was sent by Atlassian JIRA
(v6.2#6252)