You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yan Fang (JIRA)" <ji...@apache.org> on 2014/04/25 03:09:17 UTC

[jira] [Updated] (SAMZA-151) Fail early when a consumer is misconfigured

     [ https://issues.apache.org/jira/browse/SAMZA-151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yan Fang updated SAMZA-151:
---------------------------

    Attachment: SAMZA-151.patch

RB: https://reviews.apache.org/r/20697/

1. wraped the consumer register with try and catch
2. added SystemConsumerException extends SamzaException
3. added test for checking throwing the correct exception

Question:
How can I fail the Samza Container? The SystemConsumerException is mimicing KafkaCheckpointException method in SAMZA-64. But it seems not terminate the Samza job when I tested it.

> Fail early when a consumer is misconfigured
> -------------------------------------------
>
>                 Key: SAMZA-151
>                 URL: https://issues.apache.org/jira/browse/SAMZA-151
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>              Labels: newbie
>         Attachments: SAMZA-151.patch
>
>
> Currently, if a SystemFactory fails to create a SystemConsumer and throws an exception, we log this error with:
> {code}
>             info("Failed to create a consumer for %s, so skipping." format systemName)
> {code}
> SamzaContainer then continues on. If the system with the failed consumer is configured to be used for an input stream or changelog, the lack of SystemConsumer for the system will ABSOLUTELY result an exception later.
> For example, if you have a system called "kafka", and task.inputs is set to "kafka.my-topic", but the KafkaSystemFactory fails to return a consumer, we will log the error and continue. The resulting exception that's thrown much later is:
> {noformat}
> java.util.NoSuchElementException: key not found: kafka
>         at scala.collection.MapLike$class.default(MapLike.scala:223)
>         at scala.collection.immutable.Map$EmptyMap$.default(Map.scala:73)
>         at scala.collection.MapLike$class.apply(MapLike.scala:134)
>         at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:73)
>         at org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:175)
>         at org.apache.samza.container.TaskInstance$$anonfun$registerConsumers$6.apply(TaskInstance.scala:156)
>         at org.apache.samza.container.TaskInstance$$anonfun$registerConsumers$6.apply(TaskInstance.scala:155)
>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
>         at org.apache.samza.container.TaskInstance.registerConsumers(TaskInstance.scala:155)
>         at org.apache.samza.container.SamzaContainer$$anonfun$startConsumers$2.apply(SamzaContainer.scala:544)
>         at org.apache.samza.container.SamzaContainer$$anonfun$startConsumers$2.apply(SamzaContainer.scala:544)
>         at scala.collection.MapLike$DefaultValuesIterable$$anonfun$foreach$4.apply(MapLike.scala:201)
>         at scala.collection.MapLike$DefaultValuesIterable$$anonfun$foreach$4.apply(MapLike.scala:201)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:201)
>         at org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.scala:544)
>         at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:450)
>         at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:78)
>         at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> This is a pretty bad experience. We should proactively check that the system that has a failed SystemConsumer is not used in either task.inputs or changelogs, and fail outright with better logs.
> On the producer-side, we can't do this because tasks can produce to any system they want (since it's a string). We simply fail when they produce to a non-existent system.



--
This message was sent by Atlassian JIRA
(v6.2#6252)