You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Martin Kleppmann (JIRA)" <ji...@apache.org> on 2014/06/04 01:42:03 UTC

[jira] [Commented] (SAMZA-224) TestStatefulTask.testShouldStartAndRestore fails intermittently

    [ https://issues.apache.org/jira/browse/SAMZA-224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14017263#comment-14017263 ] 

Martin Kleppmann commented on SAMZA-224:
----------------------------------------

I think I have figured out what is happening here. It's a race condition between the producer and the consumer, a variation of SAMZA-166. Due to a bug, the consumer starts up at the latest offset, not the earliest offset. If the consumer starts before the message is sent (i.e. it starts consuming at offset 0), the test succeeds, but if the consumer starts after the message is sent (i.e. it starts consuming at offset 1), the message is never received.

The question is then why it is starting to consume from the latest offset, despite being configured to reset to the earliest.

When the OffsetManager in this test starts up, it fetches the metadata for the input topic. In both my passing and my failing tests, the metadata is as follows (indented for readability):

{noformat}
OffsetManager$ [DEBUG] Building offset manager for Map(
    SystemStream [system=kafka, stream=input] ->
        SystemStreamMetadata [streamName=input, partitionMetadata={
            Partition [partition=0]=SystemStreamPartitionMetadata [
                oldestOffset=null, newestOffset=null, upcomingOffset=0
            ]
        }]
    ).
{noformat}

The OffsetManager correctly sees that the {{systems.kafka.samza.offset.default=oldest}} setting, so it picks {{oldestOffset}} from the metadata, which is {{null}}. That null offset gets passed through to KafkaSystemConsumer.register, and it turns into None when it gets to BrokerProxy.addTopicPartition. BrokerProxy then logs this charming warning:

{noformat}
BrokerProxy [WARN] It appears that we received an invalid or empty offset None for [input,0].
    Attempting to use Kafka's auto.offset.reset setting.
    This can result in data loss if processing continues.
{noformat}

BrokerProxy goes on to call GetOffset.getResetOffset, which returns "largest", so the consumer starts up consuming the latest rather than the earliest.

So, whose fault is it? Is the SystemStreamMetadata bad because it contains nulls? Is the OffsetManager bad because it doesn't deal with the nulls? Is the BrokerProxy bad because it won't accept None? Would it be better to fail entirely rather than fallback to Kafka's auto.offset.reset setting?

> TestStatefulTask.testShouldStartAndRestore fails intermittently
> ---------------------------------------------------------------
>
>                 Key: SAMZA-224
>                 URL: https://issues.apache.org/jira/browse/SAMZA-224
>             Project: Samza
>          Issue Type: Bug
>          Components: test
>    Affects Versions: 0.6.0
>            Reporter: Martin Kleppmann
>            Assignee: Martin Kleppmann
>             Fix For: 0.7.0
>
>
> [~closeuris] reported the following issue on SAMZA-185. I'm now intermittently seeing this problem too, so I'm opening a new issue for it (as I'm not sure the issue has anything to do with the Kafka 0.8.1 upgrade).
> {noformat}
> org.apache.samza.test.integration.TestStatefulTask > testShouldStartAndRestore FAILED
>     java.lang.AssertionError at TestStatefulTask.scala:356
> 2 tests completed, 1 failed
> :samza-test_2.10:test FAILED
> {noformat}
> Logs provided by Yan (mine look similar):
> Failed tests: http://pastebin.com/3nEDw9jC
> Standard output: http://pastebin.com/20vZbk7c
> Standard error: http://pastebin.com/gU6cTNAr



--
This message was sent by Atlassian JIRA
(v6.2#6252)