You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Prateek Maheshwari (JIRA)" <ji...@apache.org> on 2018/07/27 18:55:00 UTC

[jira] [Resolved] (SAMZA-1728) BootstrappingChooser: Call checkOffset only for a lagging partition while choosing.

     [ https://issues.apache.org/jira/browse/SAMZA-1728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Prateek Maheshwari resolved SAMZA-1728.
---------------------------------------
    Resolution: Fixed

> BootstrappingChooser: Call checkOffset only for a lagging partition while choosing.
> -----------------------------------------------------------------------------------
>
>                 Key: SAMZA-1728
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1728
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Aditya
>            Assignee: Aditya
>            Priority: Major
>
> We seem to be calling checkOffset even for the ssps that have finished bootstrapping, resulting in decrementing systemStreamLagCounts but not laggingSystemStreamPartitions as that ssp has already been removed from the set. This results in the systemStream removed from systemStreamLagCounts while there are still few lagging ssps for that system stream.
> if (comparatorResult != null && comparatorResult.intValue() >= 0) {
>   laggingSystemStreamPartitions -= systemStreamPartition
>   systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)
>   if (systemStreamLagCounts(systemStream) == 0) {
>     // If the lag count is 0, then no partition for this stream is lagging
>     // (the stream has been fully caught up).
>     systemStreamLagCounts -= systemStream
>   }
>  
> This results in the following exception:
> java.util.NoSuchElementException: key not found: SystemStream [system=brooklin-espresso, stream=SampleBrooklinFunctionsIdentityProfileDS]
>  at scala.collection.MapLike$class.default(MapLike.scala:228)
>  at scala.collection.AbstractMap.default(Map.scala:58)
>  at scala.collection.MapLike$class.apply(MapLike.scala:141)
>  at scala.collection.AbstractMap.apply(Map.scala:58)
>  at org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:281)
>  at org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204)
>  at org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294)
>  at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210)
>  at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208)
>  at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156)
>  at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:753)
>  at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
>  at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)