You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/30 22:54:29 UTC
kafka git commit: KAFKA-2714: Added integration tests for exceptional
cases in fetching
Repository: kafka
Updated Branches:
refs/heads/trunk d9ae33d4c -> efdc2ad2e
KAFKA-2714: Added integration tests for exceptional cases in fetching
Author: Anna Povzner <an...@confluent.io>
Reviewers: Jason Gustafson
Closes #393 from apovzner/cpkafka-84
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/efdc2ad2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/efdc2ad2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/efdc2ad2
Branch: refs/heads/trunk
Commit: efdc2ad2e7ec62c6d4d72c8d672a2995be5032a0
Parents: d9ae33d
Author: Anna Povzner <an...@confluent.io>
Authored: Fri Oct 30 14:54:18 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Oct 30 14:54:18 2015 -0700
----------------------------------------------------------------------
.../kafka/api/PlaintextConsumerTest.scala | 54 ++++++++++++++++++++
1 file changed, 54 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/efdc2ad2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index eb67599..6c7a653 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -16,8 +16,10 @@ import java.util.regex.Pattern
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordTooLargeException}
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters
@@ -312,4 +314,56 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumeAndVerifyRecords(this.consumers(0), 5, 5)
}
+ @Test
+ def testFetchInvalidOffset() {
+ this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+ val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+ // produce one record
+ val totalRecords = 2
+ sendRecords(totalRecords, tp)
+ consumer0.assign(List(tp).asJava)
+
+ // poll should fail because there is no offset reset strategy set
+ intercept[NoOffsetForPartitionException] {
+ consumer0.poll(50)
+ }
+
+ // seek to out of range position
+ val outOfRangePos = totalRecords + 1
+ consumer0.seek(tp, outOfRangePos)
+ val e = intercept[OffsetOutOfRangeException] {
+ consumer0.poll(20000)
+ }
+ val outOfRangePartitions = e.offsetOutOfRangePartitions()
+ assertNotNull(outOfRangePartitions)
+ assertEquals(1, outOfRangePartitions.size)
+ assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
+
+ consumer0.close()
+ }
+
+ @Test
+ def testFetchRecordTooLarge() {
+ val maxFetchBytes = 10 * 1024
+ this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxFetchBytes.toString)
+ val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+ // produce a record that is larger than the configured fetch size
+ val record = new ProducerRecord[Array[Byte],Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
+ this.producers(0).send(record)
+
+ // consuming a too-large record should fail
+ consumer0.assign(List(tp).asJava)
+ val e = intercept[RecordTooLargeException] {
+ consumer0.poll(20000)
+ }
+ val oversizedPartitions = e.recordTooLargePartitions()
+ assertNotNull(oversizedPartitions)
+ assertEquals(1, oversizedPartitions.size)
+ // the oversized message is at offset 0
+ assertEquals(0L, oversizedPartitions.get(tp))
+
+ consumer0.close()
+ }
}