You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/04 11:30:43 UTC

kafka git commit: KAFKA-3875; Transient test failure: kafka.api.SslProducerSendTest.testSendNonCompressedMessageWithCreateTime

Repository: kafka
Updated Branches:
  refs/heads/trunk 2e3722a23 -> 6fb33afff


KAFKA-3875; Transient test failure: kafka.api.SslProducerSendTest.testSendNonCompressedMessageWithCreateTime

1. The IllegalStateException is actually thrown from testCloseWithZeroTimeoutFromSenderThread() due to a bug. We call producer.close() in the callback. Once the first callback is called, producing records in the callback will hit the IllegalStateException. This only pollutes the output, but doesn't fail the test. I fixed this by only calling producer.send() in the first callback.
2. It's not clear which test throws TimeoutException and it's not reproducible locally. One thing is that the error message in TimeoutException is mis-leading since the timeout is not necessarily due to metadata. Improved this by making the error message in TimeoutException clearer.
3. It's not clear what actually failed testSendNonCompressedMessageWithCreateTime(). One thing I found is that since we set the linger time to MAX_LONG and are sending small messages, those produced messages won't be drained until we call producer.close(10000L, TimeUnit.MILLISECONDS). Normally, 10 secs should be enough for the records to be sent. My only hypothesis is that since SSL is more expensive, occasionally, 10 secs is still not enough. So, I bumped up the timeout from 10 secs to 20 secs.

Author: Jun Rao <ju...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1703 from junrao/kafka-3875


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6fb33aff
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6fb33aff
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6fb33aff

Branch: refs/heads/trunk
Commit: 6fb33afff976e467bfa8e0b29eb82770a2a3aaec
Parents: 2e3722a
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Aug 4 12:30:24 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Aug 4 12:30:24 2016 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/internals/RecordBatch.java | 14 ++++++++++----
 .../integration/kafka/api/BaseProducerSendTest.scala  | 11 +++++++----
 2 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6fb33aff/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index e6cd68f..6706bfd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -142,17 +142,23 @@ public final class RecordBatch {
      */
     public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
         boolean expire = false;
+        String errorMessage = null;
 
-        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
+        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) {
             expire = true;
-        else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs)))
+            errorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
+        } else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) {
             expire = true;
-        else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs)))
+            errorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch creation plus linger time";
+        } else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs))) {
             expire = true;
+            errorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed since last attempt plus backoff time";
+        }
 
         if (expire) {
             this.records.close();
-            this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch containing " + recordCount + " record(s) expired due to timeout while requesting metadata from brokers for " + topicPartition));
+            this.done(-1L, Record.NO_TIMESTAMP,
+                      new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + " due to " + errorMessage));
         }
 
         return expire;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6fb33aff/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 8eaf827..b5a1284 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -224,7 +224,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, baseTimestamp + i, "key".getBytes, "value".getBytes)
         producer.send(record, callback)
       }
-      producer.close(10000L, TimeUnit.MILLISECONDS)
+      producer.close(20000L, TimeUnit.MILLISECONDS)
       assertEquals(s"Should have offset $numRecords but only successfully sent ${callback.offset}", numRecords, callback.offset)
     } finally {
       producer.close()
@@ -408,11 +408,12 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
 
     // Test closing from sender thread.
-    class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback {
+    class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]], sendRecords: Boolean) extends Callback {
       override def onCompletion(metadata: RecordMetadata, exception: Exception) {
         // Trigger another batch in accumulator before close the producer. These messages should
         // not be sent.
-        (0 until numRecords) map (i => producer.send(record))
+        if (sendRecords)
+          (0 until numRecords) foreach (i => producer.send(record))
         // The close call will be called by all the message callbacks. This tests idempotence of the close call.
         producer.close(0, TimeUnit.MILLISECONDS)
         // Test close with non zero timeout. Should not block at all.
@@ -423,7 +424,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
       try {
         // send message to partition 0
-        val responses = (0 until numRecords) map (i => producer.send(record, new CloseCallback(producer)))
+        // Only send the records in the first callback since we close the producer in the callback and no records
+        // can be sent afterwards.
+        val responses = (0 until numRecords) map (i => producer.send(record, new CloseCallback(producer, i == 0)))
         assertTrue("No request is complete.", responses.forall(!_.isDone()))
         // flush the messages.
         producer.flush()