You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2016/02/05 01:08:42 UTC

kafka git commit: KAFKA-3179; Fix seek on compressed messages

Repository: kafka
Updated Branches:
  refs/heads/trunk 7802a90ed -> db8d6f02c


KAFKA-3179; Fix seek on compressed messages

The fix itself is simple.

Some explanation on unit tests. Currently we the vast majority of unit test is running with uncompressed messages.  I was initially thinking about run all the tests using compressed messages. But it seems uncompressed messages are necessary in a many test cases because we need the bytes sent and appended to the log to be predictable. In most of other cases, it does not matter whether the message is compressed or not, and compression will slow down the unit test. So I just added one method in the BaseConsumerTest to send compressed messages whenever we need it.

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Aditya Auradkar <aa...@linkedin.com>, Ismael Juma <is...@juma.me.uk>, Joel Koshy <jj...@gmail.com>

Closes #842 from becketqin/KAFKA-3179


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/db8d6f02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/db8d6f02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/db8d6f02

Branch: refs/heads/trunk
Commit: db8d6f02c092c42f2402b7e2587c1b28d330bf83
Parents: 7802a90
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu Feb 4 16:08:21 2016 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu Feb 4 16:08:21 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  7 +++-
 .../clients/consumer/internals/FetcherTest.java | 10 ++++-
 .../kafka/api/BaseConsumerTest.scala            | 31 ++++++++++------
 .../kafka/api/PlaintextConsumerTest.scala       | 39 ++++++++++++++++++--
 4 files changed, 69 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/db8d6f02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c06e899..e8f1f55 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -559,8 +559,11 @@ public class Fetcher<K, V> {
                 MemoryRecords records = MemoryRecords.readableRecords(buffer);
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
                 for (LogEntry logEntry : records) {
-                    parsed.add(parseRecord(tp, logEntry));
-                    bytes += logEntry.size();
+                    // Skip the messages earlier than current position.
+                    if (logEntry.offset() >= position) {
+                        parsed.add(parseRecord(tp, logEntry));
+                        bytes += logEntry.size();
+                    }
                 }
 
                 if (!parsed.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/db8d6f02/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 79e47c0..5e750fd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -464,8 +464,16 @@ public class FetcherTest {
 
         // normal fetch
         for (int i = 1; i < 4; i++) {
+            // We need to make sure the message offset grows. Otherwise they will be considered as already consumed
+            // and filtered out by consumer.
+            if (i > 1) {
+                this.records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+                for (int v = 0; v < 3; v++) {
+                    this.records.append((long) i * 3 + v, "key".getBytes(), String.format("value-%d", v).getBytes());
+                }
+                this.records.close();
+            }
             fetcher.initFetches(cluster);
-
             client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i));
             consumerClient.poll(0);
             records = fetcher.fetchedRecords().get(tp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/db8d6f02/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 819e690..eb24706 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -15,7 +15,7 @@ package kafka.api
 import java.util
 
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 
@@ -73,7 +73,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     assertEquals(0, this.consumers(0).assignment.size)
     this.consumers(0).assign(List(tp).asJava)
     assertEquals(1, this.consumers(0).assignment.size)
-    
+
     this.consumers(0).seek(tp, 0)
     consumeAndVerifyRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
 
@@ -143,20 +143,20 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     this.consumers(0).poll(50)
     val pos1 = this.consumers(0).position(tp)
     val pos2 = this.consumers(0).position(tp2)
-    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
+    this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
     assertEquals(3, this.consumers(0).committed(tp).offset)
     assertNull(this.consumers(0).committed(tp2))
 
     // positions should not change
     assertEquals(pos1, this.consumers(0).position(tp))
     assertEquals(pos2, this.consumers(0).position(tp2))
-    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
+    this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
     assertEquals(3, this.consumers(0).committed(tp).offset)
     assertEquals(5, this.consumers(0).committed(tp2).offset)
 
     // Using async should pick up the committed changes after commit completes
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback)
+    this.consumers(0).commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback)
     awaitCommitCallback(this.consumers(0), commitCallback)
     assertEquals(7, this.consumers(0).committed(tp2).offset)
   }
@@ -259,10 +259,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   protected class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
     var callsToAssigned = 0
     var callsToRevoked = 0
+
     def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) {
       info("onPartitionsAssigned called.")
       callsToAssigned += 1
     }
+
     def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]) {
       info("onPartitionsRevoked called.")
       callsToRevoked += 1
@@ -274,13 +276,20 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   }
 
   protected def sendRecords(numRecords: Int, tp: TopicPartition) {
-    (0 until numRecords).map { i =>
-      this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes, s"value $i".getBytes))
-    }.foreach(_.get)
+    sendRecords(this.producers(0), numRecords, tp)
+  }
+
+  protected def sendRecords(producer: Producer[Array[Byte], Array[Byte]],
+                            numRecords: Int,
+                            tp: TopicPartition) {
+    (0 until numRecords).foreach { i =>
+      producer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes, s"value $i".getBytes))
+    }
+    producer.flush()
   }
 
   protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int,
-                                      startingKeyAndValueIndex: Int = 0) {
+                                      startingKeyAndValueIndex: Int = 0, tp: TopicPartition = tp) {
     val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
     val maxIters = numRecords * 300
     var iters = 0
@@ -294,8 +303,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     for (i <- 0 until numRecords) {
       val record = records.get(i)
       val offset = startingOffset + i
-      assertEquals(topic, record.topic())
-      assertEquals(part, record.partition())
+      assertEquals(tp.topic(), record.topic())
+      assertEquals(tp.partition(), record.partition())
       assertEquals(offset.toLong, record.offset())
       val keyAndValueIndex = startingKeyAndValueIndex + i
       assertEquals(s"key $keyAndValueIndex", new String(record.key()))

http://git-wip-us.apache.org/repos/asf/kafka/blob/db8d6f02/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 47b5d8f..6711edf 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -12,13 +12,15 @@
   */
 package kafka.api
 
+import java.util.Properties
 import java.util.regex.Pattern
 
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.errors.{InvalidTopicException, RecordTooLargeException}
 import org.junit.Assert._
@@ -266,7 +268,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testSeek() {
     val consumer = this.consumers(0)
     val totalRecords = 50L
-    sendRecords(totalRecords.toInt)
+    val mid = totalRecords / 2
+
+    // Test seek non-compressed message
+    sendRecords(totalRecords.toInt, tp)
     consumer.assign(List(tp).asJava)
 
     consumer.seekToEnd(tp)
@@ -277,10 +282,36 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(0, consumer.position(tp), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
 
-    val mid = totalRecords / 2
     consumer.seek(tp, mid)
     assertEquals(mid, consumer.position(tp))
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt)
+
+    // Test seek compressed message
+    sendCompressedMessages(totalRecords.toInt, tp2)
+    consumer.assign(List(tp2).asJava)
+
+    consumer.seekToEnd(tp2)
+    assertEquals(totalRecords, consumer.position(tp2))
+    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
+
+    consumer.seekToBeginning(tp2)
+    assertEquals(0, consumer.position(tp2), 0)
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = tp2)
+
+    consumer.seek(tp2, mid)
+    assertEquals(mid, consumer.position(tp2))
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt,
+      tp = tp2)
+  }
+
+  private def sendCompressedMessages(numRecords: Int, tp: TopicPartition) {
+    val producerProps = new Properties()
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name)
+    producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Long.MaxValue.toString)
+    val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
+        retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps))
+    sendRecords(producer, numRecords, tp)
+    producer.close()
   }
 
   def testPositionAndCommit() {