You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2015/03/31 22:36:53 UTC

[jira] [Created] (SAMZA-623) Samza torture test Checker has a race condition

Chris Riccomini created SAMZA-623:
-------------------------------------

             Summary: Samza torture test Checker has a race condition
                 Key: SAMZA-623
                 URL: https://issues.apache.org/jira/browse/SAMZA-623
             Project: Samza
          Issue Type: Bug
          Components: test
    Affects Versions: 0.9.0
            Reporter: Chris Riccomini
             Fix For: 0.10.0


The SAMZA-394 integration test includes a {{Checker}} class. This class has a race condition in it, which causes the test to fail every so often. The bug is in this code:

{code}
      for(int i = 0; i < numPartitions; i++) {
          logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " + Integer.toString(nextEpoch));
          collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "epoch"), Integer.toString(i), Integer.toString(nextEpoch)));
      }
      this.store.put(CURRENT_EPOCH, Integer.toString(nextEpoch));
{code}

If the container is killed (or fails) right before the this.store.put() method, then the epoch will have been advanced (i.e. there will be messages in the stream with nextEpoch as their epoch), but the store will still think it's the prior epoch. 

When the container starts back up, the container will fall back to its last checkpointed offset, and start reading messages. If the container gets all the way to messages with nextEpoch in them before window() is invoked again, then it will see a message with nextEpoch, but checkEpoch will still be using the older epoch (nextEpoch - 1). This causes the container to fail:

{code}
      if (currentEpochInMsg <= currentEpochInStore) {
        if (currentEpochInMsg < currentEpochInStore)
          logger.info("#### Ignoring received epoch = " + epoch + " less than what is in store " + curr);
      } else { // should have curr > epoch
        throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
      }
{code}

The container then just fails over and over, as long as it reads the nextEpoch message consistently before window() is called (with task.window.ms=60000).

I think this can be fixed simply by doing the window() check one more time in checkEpoch, if we are about to throw an exception



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)