You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/05/28 23:03:03 UTC

[jira] [Commented] (SAMZA-267) OffsetManager fails if a checkpointed topic isn't in task.inputs

    [ https://issues.apache.org/jira/browse/SAMZA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14011579#comment-14011579 ] 

Chris Riccomini commented on SAMZA-267:
---------------------------------------

We need to decide if we want to keep outdated stream offsets in the checkpoint. If we keep the outdated stream offsets in the checkpoint, it means that developers can switch task.inputs back later, and resume where they left off with the old stream. If we strip all outdated (no longer in task.inputs) streams out of the checkpoint, then once a new checkpoint is written, switching back to a previously-consumed stream will result in using the samza.offset.default setting.

I think there are use cases for both behaviors, but I don't want to add yet-another-config to control this. The easiest fix is to just strip out all outdated streams from systemStreamPartitions and lastProcessedOffsets. This would result in stripping all outdated streams from future checkpoints.

The other way to handle this is to fix each individual breakage. There are two places in the OffsetManager where breakage is introduced when input streams are switched.

# getSystemStreamPartitionsToReset
# loadDefaults

For getSystemStreamPartitionsToReset, we could log and ignore outdated streams. For loadDefaults, we could just use the global default (OffsetType.UPCOMING).

> OffsetManager fails if a checkpointed topic isn't in task.inputs
> ----------------------------------------------------------------
>
>                 Key: SAMZA-267
>                 URL: https://issues.apache.org/jira/browse/SAMZA-267
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>             Fix For: 0.7.0
>
>
> If you run a job with task.inputs=foo, let the job checkpoint, then restart it with task.inputs=bar, the last checkpoint will have foo in it. This will cause the OffsetManager to fail with:
> {noformat}
> 2014-05-15 12:16:28 SamzaContainer [ERROR] Caught exception in process loop.
> org.apache.samza.SamzaException: Attempting to reset a stream that doesn't have offset settings SystemStream [system=kafka, stream=foo].
>     at org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1$$anonfun$apply$5.apply(OffsetManager.scala:305)
>     at org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1$$anonfun$apply$5.apply(OffsetManager.scala:305)
>     at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>     at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>     at org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1.apply(OffsetManager.scala:305)
>     at org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1.apply(OffsetManager.scala:302)
>     at scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>     at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
>     at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
>     at org.apache.samza.checkpoint.OffsetManager.getSystemStreamPartitionsToReset(OffsetManager.scala:302)
>     at org.apache.samza.checkpoint.OffsetManager.stripResetStreams(OffsetManager.scala:287)
>     at org.apache.samza.checkpoint.OffsetManager.start(OffsetManager.scala:165)
>     at org.apache.samza.container.SamzaContainer.startOffsetManager(SamzaContainer.scala:558)
>     at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:492)
>     at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
>     at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> We should just warn in this case, rather than fail the container.



--
This message was sent by Atlassian JIRA
(v6.2#6252)