You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/30 22:54:29 UTC

kafka git commit: KAFKA-2714: Added integration tests for exceptional cases in fetching

Repository: kafka
Updated Branches:
  refs/heads/trunk d9ae33d4c -> efdc2ad2e


KAFKA-2714: Added integration tests for exceptional cases in fetching

Author: Anna Povzner <an...@confluent.io>

Reviewers: Jason Gustafson

Closes #393 from apovzner/cpkafka-84


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/efdc2ad2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/efdc2ad2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/efdc2ad2

Branch: refs/heads/trunk
Commit: efdc2ad2e7ec62c6d4d72c8d672a2995be5032a0
Parents: d9ae33d
Author: Anna Povzner <an...@confluent.io>
Authored: Fri Oct 30 14:54:18 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Oct 30 14:54:18 2015 -0700

----------------------------------------------------------------------
 .../kafka/api/PlaintextConsumerTest.scala       | 54 ++++++++++++++++++++
 1 file changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/efdc2ad2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index eb67599..6c7a653 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -16,8 +16,10 @@ import java.util.regex.Pattern
 
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordTooLargeException}
 import org.junit.Assert._
 import org.junit.Test
 import scala.collection.JavaConverters
@@ -312,4 +314,56 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumeAndVerifyRecords(this.consumers(0), 5, 5)
   }
 
+  @Test
+  def testFetchInvalidOffset() {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    // produce one record
+    val totalRecords = 2
+    sendRecords(totalRecords, tp)
+    consumer0.assign(List(tp).asJava)
+
+    // poll should fail because there is no offset reset strategy set
+    intercept[NoOffsetForPartitionException] {
+      consumer0.poll(50)
+    }
+
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer0.seek(tp, outOfRangePos)
+    val e = intercept[OffsetOutOfRangeException] {
+      consumer0.poll(20000)
+    }
+    val outOfRangePartitions = e.offsetOutOfRangePartitions()
+    assertNotNull(outOfRangePartitions)
+    assertEquals(1, outOfRangePartitions.size)
+    assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
+
+    consumer0.close()
+  }
+
+  @Test
+  def testFetchRecordTooLarge() {
+    val maxFetchBytes = 10 * 1024
+    this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxFetchBytes.toString)
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    // produce a record that is larger than the configured fetch size
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
+    this.producers(0).send(record)
+
+    // consuming a too-large record should fail
+    consumer0.assign(List(tp).asJava)
+    val e = intercept[RecordTooLargeException] {
+      consumer0.poll(20000)
+    }
+    val oversizedPartitions = e.recordTooLargePartitions()
+    assertNotNull(oversizedPartitions)
+    assertEquals(1, oversizedPartitions.size)
+    // the oversized message is at offset 0
+    assertEquals(0L, oversizedPartitions.get(tp))
+
+    consumer0.close()
+  }
 }