You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yi Pan (Data Infrastructure) (JIRA)" <ji...@apache.org> on 2015/04/03 22:19:54 UTC

[jira] [Created] (SAMZA-635) KafkaSystemProducer may got exception out-of-order

Yi Pan (Data Infrastructure) created SAMZA-635:
--------------------------------------------------

             Summary: KafkaSystemProducer may got exception out-of-order
                 Key: SAMZA-635
                 URL: https://issues.apache.org/jira/browse/SAMZA-635
             Project: Samza
          Issue Type: Bug
    Affects Versions: 0.9.0
            Reporter: Yi Pan (Data Infrastructure)


In the current KafkaSystemProducer design, there is a possibility that a non-retriable exceptions can be thrown from the Kafka producer send thread and creates the race conditions in the following code blocks:

{code}
82    sendFailed.set(false)
83
84    retryBackoff.run(
85      loop => {
86        if (sendFailed.get()) {
87          throw exceptionThrown.get()
88        }
{code}

And 

{code}
91            def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
92              if (exception == null) {
93                //send was successful. Don't retry
94                metrics.sendSuccess.inc
95              } else {
96                //If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
97                //Hence, fail container!
98                exceptionThrown.compareAndSet(null, exception)
99                sendFailed.set(true)
100              }
101            }
{code}

The main thread sets and gets _sendFailed_ in line 82 and 86, and the Kafka send thread is setting it in line 99.

Thera could be two race conditions here:
1) the Kafka send thread complete line 99 and the main thread executes line 82, in which we missed an exception
2) the main thread finishes line 82 in the current message, and the Kafka send thread execute line 99 for the previous message. In this case, the main thread got an exception that is for the previous message, not the current one.

The configuration that can trigger this to happen is:
{code}
systems.kafka.producer.max.request.size=102400
{code}

Broker side:
{code}
message.max.bytes=10240
{code}

And inside task.process(), we send a 16KB message first, then a small message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)