You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "József Márton Jung (JIRA)" <ji...@apache.org> on 2015/06/04 09:06:38 UTC

[jira] [Assigned] (SAMZA-572) SamzaContainer checkpoints and windows immediately on startup

     [ https://issues.apache.org/jira/browse/SAMZA-572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

József Márton Jung reassigned SAMZA-572:
----------------------------------------

    Assignee: József Márton Jung

> SamzaContainer checkpoints and windows immediately on startup
> -------------------------------------------------------------
>
>                 Key: SAMZA-572
>                 URL: https://issues.apache.org/jira/browse/SAMZA-572
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>            Assignee: József Márton Jung
>              Labels: newbie++
>             Fix For: 0.10.0
>
>
> I noticed that the SamzaContainer checkpoints immediately on startup. This is kind of useless since it has either processed only 0 or 1 messages. The issue is that last window and last commit are both set to 0:
> {code}
>   private var lastWindowMs = 0L
>   private var lastCommitMs = 0L
> {code}
> The logic that's triggered after the RunLoop starts is:
> {code}
>       if (commitMs >= 0 && lastCommitMs + commitMs < clock()) {
> {code}
> Of course this is always true, since lastCommitMs + commitMs will always be < clock() when lastCommitMs starts at 0. The same logic is triggered for window.
> It seems like we should set lastCommitMs/lastWindowMs to clock() if they're 0 (or just set them to clock() on instantiation).
> I noticed this by running a kafka-console-consumer against hello-samza. When the job started, I saw:
> {noformat}
>   {"type":"changelog-partition-mapping"}	{"Partition 0":0}
>   {"systemstreampartition-grouper-factory":"org.apache.samza.container.grouper.stream.GroupByPartitionFactory","taskName":"Partition 0","type":"checkpoint"}	{}
>   {"systemstreampartition-grouper-factory":"org.apache.samza.container.grouper.stream.GroupByPartitionFactory","taskName":"Partition 0","type":"checkpoint"}	{"SystemStreamPartition [kafka, wikipedia-raw, 0]":{"system":"kafka","partition":"0","offset":"179","stream":"wikipedia-raw"}}
> {noformat}
> The first empty checkpoint happened immediately on startup, before any messages had been processed. Since the job was being started for the first time, OffsetManager.lastProcessedOffsets was empty, and it immediately checkpointed an empty map. This shouldn't result in data loss, it's just annoying and useless.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)