You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/17 02:34:56 UTC

kafka git commit: KAFKA-2515: Handle oversized messages properly in new consumer

Repository: kafka
Updated Branches:
  refs/heads/trunk ef65d0a36 -> e2e5c8914


KAFKA-2515: Handle oversized messages properly in new consumer

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Dong Lin, Jun Rao

Closes #318 from guozhangwang/K2515


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e2e5c891
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e2e5c891
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e2e5c891

Branch: refs/heads/trunk
Commit: e2e5c891428e5f1288b6f4674983d924f8bdb779
Parents: ef65d0a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Oct 16 17:39:39 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 16 17:39:39 2015 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 32 +++++++++++++++++++-
 .../common/errors/RecordTooLargeException.java  | 13 ++++++++
 .../clients/consumer/internals/FetcherTest.java | 21 +++++++++++++
 .../integration/kafka/api/ConsumerTest.scala    |  3 +-
 4 files changed, 66 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e2e5c891/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4d68e74..3d02bfd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -80,6 +81,7 @@ public class Fetcher<K, V> {
     private final Deserializer<V> valueDeserializer;
 
     private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
+    private final Map<TopicPartition, Long> recordTooLargePartitions;
 
     public Fetcher(ConsumerNetworkClient client,
                    int minBytes,
@@ -110,6 +112,7 @@ public class Fetcher<K, V> {
 
         this.records = new LinkedList<PartitionRecords<K, V>>();
         this.offsetOutOfRangePartitions = new HashMap<>();
+        this.recordTooLargePartitions = new HashMap<>();
 
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
         this.retryBackoffMs = retryBackoffMs;
@@ -288,6 +291,25 @@ public class Fetcher<K, V> {
     }
 
     /**
+     * If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException
+     *
+     * @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
+     */
+    private void throwIfRecordTooLarge() throws OffsetOutOfRangeException {
+        Map<TopicPartition, Long> copiedRecordTooLargePartitions = new HashMap<>(this.recordTooLargePartitions);
+        this.recordTooLargePartitions.clear();
+
+        if (!copiedRecordTooLargePartitions.isEmpty())
+            throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
+                + copiedRecordTooLargePartitions
+                + " whose size is larger than the fetch size "
+                + this.fetchSize
+                + " and hence cannot be ever returned."
+                + " Increase the fetch size, or decrease the maximum message size the broker will allow.",
+                copiedRecordTooLargePartitions);
+    }
+
+    /**
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
      * @return The fetched records per partition
@@ -300,6 +322,7 @@ public class Fetcher<K, V> {
         } else {
             Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
             throwIfOffsetOutOfRange();
+            throwIfRecordTooLarge();
 
             for (PartitionRecords<K, V> part : this.records) {
                 if (!subscriptions.isAssigned(part.partition)) {
@@ -483,12 +506,19 @@ public class Fetcher<K, V> {
                         parsed.add(parseRecord(tp, logEntry));
                         bytes += logEntry.size();
                     }
+
                     if (!parsed.isEmpty()) {
                         ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
                         this.subscriptions.fetched(tp, record.offset() + 1);
-                        this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));
+                        this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
                         this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
+                    } else if (buffer.capacity() >= this.fetchSize) {
+                        // we did not read a single message from a max fetchable buffer
+                        // because that message's size is larger than fetch size, in this case
+                        // record this exception
+                        this.recordTooLargePartitions.put(tp, fetchOffset);
                     }
+
                     this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
                     totalBytes += bytes;
                     totalCount += parsed.size();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2e5c891/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
index 737b7f0..d844445 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
@@ -12,12 +12,17 @@
  */
 package org.apache.kafka.common.errors;
 
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
 /**
  * This record is larger than the maximum allowable size
  */
 public class RecordTooLargeException extends ApiException {
 
     private static final long serialVersionUID = 1L;
+    private Map<TopicPartition, Long> recordTooLargePartitions = null;
 
     public RecordTooLargeException() {
         super();
@@ -35,4 +40,12 @@ public class RecordTooLargeException extends ApiException {
         super(cause);
     }
 
+    public RecordTooLargeException(String message, Map<TopicPartition, Long> recordTooLargePartitions) {
+        super(message);
+        this.recordTooLargePartitions = recordTooLargePartitions;
+    }
+
+    public Map<TopicPartition, Long> recordTooLargePartitions() {
+        return recordTooLargePartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2e5c891/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index f5f9ef1..4929449 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -53,6 +54,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -126,6 +128,25 @@ public class FetcherTest {
         }
     }
 
+    @Test(expected = RecordTooLargeException.class)
+    public void testFetchRecordTooLarge() {
+        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.seek(tp, 0);
+
+        // prepare large record
+        MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+        byte[] bytes = new byte[this.fetchSize];
+        new Random().nextBytes(bytes);
+        records.append(1L, null, bytes);
+        records.close();
+
+        // resize the limit of the buffer to pretend it is only fetch-size large
+        fetcher.initFetches(cluster);
+        client.prepareResponse(fetchResponse((ByteBuffer) records.flip().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
+        consumerClient.poll(0);
+        fetcher.fetchedRecords();
+    }
+
     @Test
     public void testFetchDuringRebalance() {
         subscriptions.subscribe(Arrays.asList(topicName), listener);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2e5c891/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 0a02b03..a64c2f3 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -54,7 +54,6 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
-  this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
   this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
   this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
@@ -150,7 +149,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     val topic2 = "topic2"
     TestUtils.createTopic(this.zkClient, topic2, 2, serverCount, this.servers)
 
-    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
 
     val numRecords = 10000