You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/17 02:34:56 UTC
kafka git commit: KAFKA-2515: Handle oversized messages properly in
new consumer
Repository: kafka
Updated Branches:
refs/heads/trunk ef65d0a36 -> e2e5c8914
KAFKA-2515: Handle oversized messages properly in new consumer
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Dong Lin, Jun Rao
Closes #318 from guozhangwang/K2515
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e2e5c891
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e2e5c891
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e2e5c891
Branch: refs/heads/trunk
Commit: e2e5c891428e5f1288b6f4674983d924f8bdb779
Parents: ef65d0a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Oct 16 17:39:39 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 16 17:39:39 2015 -0700
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 32 +++++++++++++++++++-
.../common/errors/RecordTooLargeException.java | 13 ++++++++
.../clients/consumer/internals/FetcherTest.java | 21 +++++++++++++
.../integration/kafka/api/ConsumerTest.scala | 3 +-
4 files changed, 66 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2e5c891/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4d68e74..3d02bfd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -80,6 +81,7 @@ public class Fetcher<K, V> {
private final Deserializer<V> valueDeserializer;
private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
+ private final Map<TopicPartition, Long> recordTooLargePartitions;
public Fetcher(ConsumerNetworkClient client,
int minBytes,
@@ -110,6 +112,7 @@ public class Fetcher<K, V> {
this.records = new LinkedList<PartitionRecords<K, V>>();
this.offsetOutOfRangePartitions = new HashMap<>();
+ this.recordTooLargePartitions = new HashMap<>();
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
this.retryBackoffMs = retryBackoffMs;
@@ -288,6 +291,25 @@ public class Fetcher<K, V> {
}
/**
+ * If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException
+ *
+ * @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
+ */
+ private void throwIfRecordTooLarge() throws OffsetOutOfRangeException {
+ Map<TopicPartition, Long> copiedRecordTooLargePartitions = new HashMap<>(this.recordTooLargePartitions);
+ this.recordTooLargePartitions.clear();
+
+ if (!copiedRecordTooLargePartitions.isEmpty())
+ throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
+ + copiedRecordTooLargePartitions
+ + " whose size is larger than the fetch size "
+ + this.fetchSize
+ + " and hence cannot be ever returned."
+ + " Increase the fetch size, or decrease the maximum message size the broker will allow.",
+ copiedRecordTooLargePartitions);
+ }
+
+ /**
* Return the fetched records, empty the record buffer and update the consumed position.
*
* @return The fetched records per partition
@@ -300,6 +322,7 @@ public class Fetcher<K, V> {
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
throwIfOffsetOutOfRange();
+ throwIfRecordTooLarge();
for (PartitionRecords<K, V> part : this.records) {
if (!subscriptions.isAssigned(part.partition)) {
@@ -483,12 +506,19 @@ public class Fetcher<K, V> {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
}
+
if (!parsed.isEmpty()) {
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.subscriptions.fetched(tp, record.offset() + 1);
- this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));
+ this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
+ } else if (buffer.capacity() >= this.fetchSize) {
+ // we did not read a single message from a max fetchable buffer
+ // because that message's size is larger than fetch size, in this case
+ // record this exception
+ this.recordTooLargePartitions.put(tp, fetchOffset);
}
+
this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
totalBytes += bytes;
totalCount += parsed.size();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2e5c891/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
index 737b7f0..d844445 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
@@ -12,12 +12,17 @@
*/
package org.apache.kafka.common.errors;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
/**
* This record is larger than the maximum allowable size
*/
public class RecordTooLargeException extends ApiException {
private static final long serialVersionUID = 1L;
+ private Map<TopicPartition, Long> recordTooLargePartitions = null;
public RecordTooLargeException() {
super();
@@ -35,4 +40,12 @@ public class RecordTooLargeException extends ApiException {
super(cause);
}
+ public RecordTooLargeException(String message, Map<TopicPartition, Long> recordTooLargePartitions) {
+ super(message);
+ this.recordTooLargePartitions = recordTooLargePartitions;
+ }
+
+ public Map<TopicPartition, Long> recordTooLargePartitions() {
+ return recordTooLargePartitions;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2e5c891/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index f5f9ef1..4929449 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -53,6 +54,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -126,6 +128,25 @@ public class FetcherTest {
}
}
+ @Test(expected = RecordTooLargeException.class)
+ public void testFetchRecordTooLarge() {
+ subscriptions.assign(Arrays.asList(tp));
+ subscriptions.seek(tp, 0);
+
+ // prepare large record
+ MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+ byte[] bytes = new byte[this.fetchSize];
+ new Random().nextBytes(bytes);
+ records.append(1L, null, bytes);
+ records.close();
+
+ // resize the limit of the buffer to pretend it is only fetch-size large
+ fetcher.initFetches(cluster);
+ client.prepareResponse(fetchResponse((ByteBuffer) records.flip().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
+ consumerClient.poll(0);
+ fetcher.fetchedRecords();
+ }
+
@Test
public void testFetchDuringRebalance() {
subscriptions.subscribe(Arrays.asList(topicName), listener);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2e5c891/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 0a02b03..a64c2f3 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -54,7 +54,6 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
- this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
@@ -150,7 +149,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
val topic2 = "topic2"
TestUtils.createTopic(this.zkClient, topic2, 2, serverCount, this.servers)
- this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
val numRecords = 10000