You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/07/28 10:18:42 UTC
kafka git commit: KAFKA-3977;
Defer fetch parsing for space efficiency and to ensure exceptions are
raised to the user [Forced Update!]
Repository: kafka
Updated Branches:
refs/heads/trunk a750c5672 -> ff557f02a (forced update)
KAFKA-3977; Defer fetch parsing for space efficiency and to ensure exceptions are raised to the user
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava <me...@ewencp.org>, Ismael Juma <is...@juma.me.uk>
Closes #1656 from hachikuji/KAFKA-3977
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff557f02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff557f02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff557f02
Branch: refs/heads/trunk
Commit: ff557f02ac628edbe220ea69888d39de834527d3
Parents: d5c821c
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Jul 28 11:15:45 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Jul 28 11:18:25 2016 +0100
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 325 ++++++++++---------
.../apache/kafka/common/record/Compressor.java | 4 +-
.../common/record/InvalidRecordException.java | 4 +-
.../clients/consumer/internals/FetcherTest.java | 153 +++++++--
4 files changed, 310 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff557f02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ddfb584..c811a03 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.Metrics;
@@ -38,8 +39,10 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
@@ -59,7 +62,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -83,13 +85,11 @@ public class Fetcher<K, V> {
private final Metadata metadata;
private final FetchManagerMetrics sensors;
private final SubscriptionState subscriptions;
- private final List<PartitionRecords<K, V>> records;
+ private final List<CompletedFetch> completedFetches;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
- private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
- private final Set<String> unauthorizedTopics;
- private final Map<TopicPartition, Long> recordTooLargePartitions;
+ private PartitionRecords<K, V> nextInLineRecords = null;
public Fetcher(ConsumerNetworkClient client,
int minBytes,
@@ -105,7 +105,6 @@ public class Fetcher<K, V> {
String metricGrpPrefix,
Time time,
long retryBackoffMs) {
-
this.time = time;
this.client = client;
this.metadata = metadata;
@@ -115,31 +114,37 @@ public class Fetcher<K, V> {
this.fetchSize = fetchSize;
this.maxPollRecords = maxPollRecords;
this.checkCrcs = checkCrcs;
-
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
-
- this.records = new LinkedList<>();
- this.offsetOutOfRangePartitions = new HashMap<>();
- this.unauthorizedTopics = new HashSet<>();
- this.recordTooLargePartitions = new HashMap<>();
-
+ this.completedFetches = new ArrayList<>();
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
this.retryBackoffMs = retryBackoffMs;
}
/**
- * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
- *
+ * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
+ * an in-flight fetch or pending fetch data.
*/
public void sendFetches() {
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
- final FetchRequest fetch = fetchEntry.getValue();
- client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
+ final FetchRequest request = fetchEntry.getValue();
+ client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
- public void onSuccess(ClientResponse response) {
- handleFetchResponse(response, fetch);
+ public void onSuccess(ClientResponse resp) {
+ FetchResponse response = new FetchResponse(resp.responseBody());
+ Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
+ FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
+
+ for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+ TopicPartition partition = entry.getKey();
+ long fetchOffset = request.fetchData().get(partition).offset;
+ FetchResponse.PartitionData fetchData = entry.getValue();
+ completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
+ }
+
+ sensors.fetchLatency.record(resp.requestLatencyMs());
+ sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
@Override
@@ -152,7 +157,7 @@ public class Fetcher<K, V> {
/**
* Update the fetch positions for the provided partitions.
- * @param partitions
+ * @param partitions the partitions to update positions for
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available
*/
public void updateFetchPositions(Set<TopicPartition> partitions) {
@@ -324,62 +329,6 @@ public class Fetcher<K, V> {
}
/**
- * If any partition from previous fetchResponse contains OffsetOutOfRange error and
- * the defaultResetPolicy is NONE, throw OffsetOutOfRangeException
- *
- * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
- */
- private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException {
- Map<TopicPartition, Long> currentOutOfRangePartitions = new HashMap<>();
-
- // filter offsetOutOfRangePartitions to retain only the fetchable partitions
- for (Map.Entry<TopicPartition, Long> entry: this.offsetOutOfRangePartitions.entrySet()) {
- if (!subscriptions.isFetchable(entry.getKey())) {
- log.debug("Ignoring fetched records for {} since it is no longer fetchable", entry.getKey());
- continue;
- }
- Long position = subscriptions.position(entry.getKey());
- // ignore partition if the current position != the offset in fetchResponse, e.g. after seek()
- if (position != null && entry.getValue().equals(position))
- currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());
- }
- this.offsetOutOfRangePartitions.clear();
- if (!currentOutOfRangePartitions.isEmpty())
- throw new OffsetOutOfRangeException(currentOutOfRangePartitions);
- }
-
- /**
- * If any topic from previous fetchResponse contains an Authorization error, raise an exception
- * @throws TopicAuthorizationException
- */
- private void throwIfUnauthorizedTopics() throws TopicAuthorizationException {
- if (!unauthorizedTopics.isEmpty()) {
- Set<String> topics = new HashSet<>(unauthorizedTopics);
- unauthorizedTopics.clear();
- throw new TopicAuthorizationException(topics);
- }
- }
-
- /**
- * If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException
- *
- * @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
- */
- private void throwIfRecordTooLarge() throws RecordTooLargeException {
- Map<TopicPartition, Long> copiedRecordTooLargePartitions = new HashMap<>(this.recordTooLargePartitions);
- this.recordTooLargePartitions.clear();
-
- if (!copiedRecordTooLargePartitions.isEmpty())
- throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
- + copiedRecordTooLargePartitions
- + " whose size is larger than the fetch size "
- + this.fetchSize
- + " and hence cannot be ever returned."
- + " Increase the fetch size, or decrease the maximum message size the broker will allow.",
- copiedRecordTooLargePartitions);
- }
-
- /**
* Return the fetched records, empty the record buffer and update the consumed position.
*
* NOTE: returning empty records guarantees the consumed position are NOT updated.
@@ -393,60 +342,68 @@ public class Fetcher<K, V> {
return Collections.emptyMap();
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
- throwIfOffsetOutOfRange();
- throwIfUnauthorizedTopics();
- throwIfRecordTooLarge();
-
- int maxRecords = maxPollRecords;
- Iterator<PartitionRecords<K, V>> iterator = records.iterator();
- while (iterator.hasNext() && maxRecords > 0) {
- PartitionRecords<K, V> part = iterator.next();
- maxRecords -= append(drained, part, maxRecords);
- if (part.isConsumed())
- iterator.remove();
+ int recordsRemaining = maxPollRecords;
+ Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator();
+
+ while (recordsRemaining > 0) {
+ if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
+ if (!completedFetchesIterator.hasNext())
+ break;
+
+ CompletedFetch completion = completedFetchesIterator.next();
+ completedFetchesIterator.remove();
+ nextInLineRecords = parseFetchedData(completion);
+ } else {
+ recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
+ }
}
+
return drained;
}
}
private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
- PartitionRecords<K, V> part,
+ PartitionRecords<K, V> partitionRecords,
int maxRecords) {
- if (!subscriptions.isAssigned(part.partition)) {
+ if (partitionRecords.isEmpty())
+ return 0;
+
+ if (!subscriptions.isAssigned(partitionRecords.partition)) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
- log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
+ log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
} else {
// note that the consumed position should always be available as long as the partition is still assigned
- long position = subscriptions.position(part.partition);
- if (!subscriptions.isFetchable(part.partition)) {
+ long position = subscriptions.position(partitionRecords.partition);
+ if (!subscriptions.isFetchable(partitionRecords.partition)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's poll call
- log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
- } else if (part.fetchOffset == position) {
- List<ConsumerRecord<K, V>> partRecords = part.take(maxRecords);
+ log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
+ } else if (partitionRecords.fetchOffset == position) {
+ // we are ensured to have at least one record since we already checked for emptiness
+ List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
- "position to {}", position, part.partition, nextOffset);
+ "position to {}", position, partitionRecords.partition, nextOffset);
- List<ConsumerRecord<K, V>> records = drained.get(part.partition);
+ List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
if (records == null) {
records = partRecords;
- drained.put(part.partition, records);
+ drained.put(partitionRecords.partition, records);
} else {
records.addAll(partRecords);
}
- subscriptions.position(part.partition, nextOffset);
+ subscriptions.position(partitionRecords.partition, nextOffset);
return partRecords.size();
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
- part.partition, part.fetchOffset, position);
+ partitionRecords.partition, partitionRecords.fetchOffset, position);
}
}
- part.discard();
+ partitionRecords.discard();
return 0;
}
@@ -513,10 +470,10 @@ public class Fetcher<K, V> {
private Set<TopicPartition> fetchablePartitions() {
Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
- if (records.isEmpty())
- return fetchable;
- for (PartitionRecords<K, V> partitionRecords : records)
- fetchable.remove(partitionRecords.partition);
+ if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
+ fetchable.remove(nextInLineRecords.partition);
+ for (CompletedFetch completedFetch : completedFetches)
+ fetchable.remove(completedFetch.partition);
return fetchable;
}
@@ -559,30 +516,29 @@ public class Fetcher<K, V> {
/**
* The callback for fetch completion
*/
- private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
- int totalBytes = 0;
- int totalCount = 0;
- FetchResponse response = new FetchResponse(resp.responseBody());
- for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
- TopicPartition tp = entry.getKey();
- FetchResponse.PartitionData partition = entry.getValue();
+ private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
+ TopicPartition tp = completedFetch.partition;
+ FetchResponse.PartitionData partition = completedFetch.partitionData;
+ long fetchOffset = completedFetch.fetchedOffset;
+ int bytes = 0;
+ int recordsCount = 0;
+ PartitionRecords<K, V> parsedRecords = null;
+
+ try {
if (!subscriptions.isFetchable(tp)) {
// this can happen when a rebalance happened or a partition consumption paused
// while fetch is still in-flight
log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
} else if (partition.errorCode == Errors.NONE.code()) {
- long fetchOffset = request.fetchData().get(tp).offset;
-
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Long position = subscriptions.position(tp);
if (position == null || position != fetchOffset) {
- log.debug("Discarding fetch response for partition {} since its offset {} does not match " +
+ log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
- continue;
+ return null;
}
- int bytes = 0;
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
@@ -597,79 +553,95 @@ public class Fetcher<K, V> {
}
}
+ recordsCount = parsed.size();
+ this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);
+
if (!parsed.isEmpty()) {
log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
+ parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
- this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
} else if (buffer.limit() > 0 && !skippedRecords) {
// we did not read a single message from a non-empty buffer
// because that message's size is larger than fetch size, in this case
// record this exception
- this.recordTooLargePartitions.put(tp, fetchOffset);
+ Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
+ throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
+ + recordTooLargePartitions
+ + " whose size is larger than the fetch size "
+ + this.fetchSize
+ + " and hence cannot be ever returned."
+ + " Increase the fetch size on the client (using max.partition.fetch.bytes),"
+ + " or decrease the maximum message size the broker will allow (using message.max.bytes).",
+ recordTooLargePartitions);
}
-
- this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
- totalBytes += bytes;
- totalCount += parsed.size();
} else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
- || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+ || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
this.metadata.requestUpdate();
} else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
- long fetchOffset = request.fetchData().get(tp).offset;
- if (subscriptions.hasDefaultOffsetResetPolicy())
+ if (fetchOffset != subscriptions.position(tp)) {
+ log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
+ "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
+ } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
+ log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
subscriptions.needOffsetReset(tp);
- else
- this.offsetOutOfRangePartitions.put(tp, fetchOffset);
- log.info("Fetch offset {} is out of range, resetting offset", fetchOffset);
+ } else {
+ throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
+ }
} else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
log.warn("Not authorized to read from topic {}.", tp.topic());
- unauthorizedTopics.add(tp.topic());
+ throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (partition.errorCode == Errors.UNKNOWN.code()) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
}
+ } finally {
+ completedFetch.metricAggregator.record(tp, bytes, recordsCount);
}
- this.sensors.bytesFetched.record(totalBytes);
- this.sensors.recordsFetched.record(totalCount);
- this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
- this.sensors.fetchLatency.record(resp.requestLatencyMs());
+
+ return parsedRecords;
}
/**
* Parse the record entry, deserializing the key / value fields if necessary
*/
private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
+ Record record = logEntry.record();
+
+ if (this.checkCrcs && !record.isValid())
+ throw new InvalidRecordException("Record for partition " + partition + " at offset "
+ + logEntry.offset() + " is corrupt (stored crc = " + record.checksum()
+ + ", computed crc = "
+ + record.computeChecksum()
+ + ")");
+
try {
- if (this.checkCrcs)
- logEntry.record().ensureValid();
long offset = logEntry.offset();
- long timestamp = logEntry.record().timestamp();
- TimestampType timestampType = logEntry.record().timestampType();
- ByteBuffer keyBytes = logEntry.record().key();
+ long timestamp = record.timestamp();
+ TimestampType timestampType = record.timestampType();
+ ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
- ByteBuffer valueBytes = logEntry.record().value();
+ ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
- timestamp, timestampType, logEntry.record().checksum(),
+ timestamp, timestampType, record.checksum(),
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
key, value);
- } catch (KafkaException e) {
- throw e;
} catch (RuntimeException e) {
- throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e);
+ throw new SerializationException("Error deserializing key/value for partition " + partition +
+ " at offset " + logEntry.offset(), e);
}
}
private static class PartitionRecords<K, V> {
- public long fetchOffset;
- public TopicPartition partition;
- public List<ConsumerRecord<K, V>> records;
+ private long fetchOffset;
+ private TopicPartition partition;
+ private List<ConsumerRecord<K, V>> records;
public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
this.fetchOffset = fetchOffset;
@@ -677,7 +649,7 @@ public class Fetcher<K, V> {
this.records = records;
}
- private boolean isConsumed() {
+ private boolean isEmpty() {
return records == null || records.isEmpty();
}
@@ -687,7 +659,7 @@ public class Fetcher<K, V> {
private List<ConsumerRecord<K, V>> take(int n) {
if (records == null)
- return Collections.emptyList();
+ return new ArrayList<>();
if (n >= records.size()) {
List<ConsumerRecord<K, V>> res = this.records;
@@ -709,7 +681,59 @@ public class Fetcher<K, V> {
}
}
- private class FetchManagerMetrics {
+ private static class CompletedFetch {
+ private final TopicPartition partition;
+ private final long fetchedOffset;
+ private final FetchResponse.PartitionData partitionData;
+ private final FetchResponseMetricAggregator metricAggregator;
+
+ public CompletedFetch(TopicPartition partition,
+ long fetchedOffset,
+ FetchResponse.PartitionData partitionData,
+ FetchResponseMetricAggregator metricAggregator) {
+ this.partition = partition;
+ this.fetchedOffset = fetchedOffset;
+ this.partitionData = partitionData;
+ this.metricAggregator = metricAggregator;
+ }
+ }
+
+ /**
+ * Since we parse the message data for each partition from each fetch response lazily, fetch-level
+ * metrics need to be aggregated as the messages from each partition are parsed. This class is used
+ * to facilitate this incremental aggregation.
+ */
+ private static class FetchResponseMetricAggregator {
+ private final FetchManagerMetrics sensors;
+ private final Set<TopicPartition> unrecordedPartitions;
+
+ private int totalBytes;
+ private int totalRecords;
+
+ public FetchResponseMetricAggregator(FetchManagerMetrics sensors,
+ Set<TopicPartition> partitions) {
+ this.sensors = sensors;
+ this.unrecordedPartitions = partitions;
+ }
+
+ /**
+ * After each partition is parsed, we update the current metric totals with the total bytes
+ * and number of records parsed. After all partitions have reported, we write the metric.
+ */
+ public void record(TopicPartition partition, int bytes, int records) {
+ unrecordedPartitions.remove(partition);
+ totalBytes += bytes;
+ totalRecords += records;
+
+ if (unrecordedPartitions.isEmpty()) {
+ // once all expected partitions from the fetch have reported in, record the metrics
+ sensors.bytesFetched.record(totalBytes);
+ sensors.recordsFetched.record(totalRecords);
+ }
+ }
+ }
+
+ private static class FetchManagerMetrics {
public final Metrics metrics;
public final String metricGrpName;
@@ -719,7 +743,6 @@ public class Fetcher<K, V> {
public final Sensor recordsFetchLag;
public final Sensor fetchThrottleTimeSensor;
-
public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff557f02/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index e23a52e..a806975 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -242,7 +242,7 @@ public class Compressor {
// the following two functions also need to be public since they are used in MemoryRecords.iteration
- static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
+ public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
try {
switch (type) {
case NONE:
@@ -271,7 +271,7 @@ public class Compressor {
}
}
- static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
+ public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
try {
switch (type) {
case NONE:
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff557f02/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
index 5815b21..a1009ca 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
@@ -16,7 +16,9 @@
*/
package org.apache.kafka.common.record;
-public class InvalidRecordException extends RuntimeException {
+import org.apache.kafka.common.KafkaException;
+
+public class InvalidRecordException extends KafkaException {
private static final long serialVersionUID = 1;
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff557f02/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 8fad30f..2fbd43e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
@@ -38,6 +39,8 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Compressor;
+import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.FetchRequest;
@@ -47,6 +50,7 @@ import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -128,7 +132,7 @@ public class FetcherTest {
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
assertEquals(3, records.size());
- assertEquals(4L, (long) subscriptions.position(tp)); // this is the next fetching position
+ assertEquals(4L, subscriptions.position(tp).longValue()); // this is the next fetching position
long offset = 1;
for (ConsumerRecord<byte[], byte[]> record : records) {
assertEquals(offset, record.offset());
@@ -148,8 +152,82 @@ public class FetcherTest {
}
@Test
+ public void testFetchedRecordsRaisesOnSerializationErrors() {
+ // raise an exception from somewhere in the middle of the fetch response
+ // so that we can verify that our position does not advance after raising
+ ByteArrayDeserializer deserializer = new ByteArrayDeserializer() {
+ int i = 0;
+ @Override
+ public byte[] deserialize(String topic, byte[] data) {
+ if (i++ == 1)
+ throw new SerializationException();
+ return data;
+ }
+ };
+
+ Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer);
+
+ subscriptions.assignFromUser(Collections.singleton(tp));
+ subscriptions.seek(tp, 1);
+
+ client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+
+ fetcher.sendFetches();
+ consumerClient.poll(0);
+ try {
+ fetcher.fetchedRecords();
+ fail("fetchedRecords should have raised");
+ } catch (SerializationException e) {
+ // the position should not advance since no data has been returned
+ assertEquals(1, subscriptions.position(tp).longValue());
+ }
+ }
+
+ @Test
+ public void testParseInvalidRecord() {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ Compressor compressor = new Compressor(buffer, CompressionType.NONE);
+
+ byte[] key = "foo".getBytes();
+ byte[] value = "baz".getBytes();
+ long offset = 0;
+ long timestamp = 500L;
+
+ int size = Record.recordSize(key, value);
+ long crc = Record.computeChecksum(timestamp, key, value, CompressionType.NONE, 0, -1);
+
+ // write one valid record
+ compressor.putLong(offset);
+ compressor.putInt(size);
+ Record.write(compressor, crc, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1);
+
+ // and one invalid record (note the crc)
+ compressor.putLong(offset);
+ compressor.putInt(size);
+ Record.write(compressor, crc + 1, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1);
+
+ compressor.close();
+ buffer.flip();
+
+ subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.seek(tp, 0);
+
+ // normal fetch
+ fetcher.sendFetches();
+ client.prepareResponse(fetchResponse(buffer, Errors.NONE.code(), 100L, 0));
+ consumerClient.poll(0);
+ try {
+ fetcher.fetchedRecords();
+ fail("fetchedRecords should have raised");
+ } catch (InvalidRecordException e) {
+ // the position should not advance since no data has been returned
+ assertEquals(0, subscriptions.position(tp).longValue());
+ }
+ }
+
+ @Test
public void testFetchMaxPollRecords() {
- Fetcher<byte[], byte[]> fetcher = createFetcher(2, subscriptions, new Metrics(time));
+ Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
List<ConsumerRecord<byte[], byte[]>> records;
subscriptions.assignFromUser(Arrays.asList(tp));
@@ -162,7 +240,7 @@ public class FetcherTest {
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
assertEquals(2, records.size());
- assertEquals(3L, (long) subscriptions.position(tp));
+ assertEquals(3L, subscriptions.position(tp).longValue());
assertEquals(1, records.get(0).offset());
assertEquals(2, records.get(1).offset());
@@ -170,14 +248,14 @@ public class FetcherTest {
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
assertEquals(1, records.size());
- assertEquals(4L, (long) subscriptions.position(tp));
+ assertEquals(4L, subscriptions.position(tp).longValue());
assertEquals(3, records.get(0).offset());
fetcher.sendFetches();
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
assertEquals(2, records.size());
- assertEquals(6L, (long) subscriptions.position(tp));
+ assertEquals(6L, subscriptions.position(tp).longValue());
assertEquals(4, records.get(0).offset());
assertEquals(5, records.get(1).offset());
}
@@ -203,7 +281,7 @@ public class FetcherTest {
consumerClient.poll(0);
consumerRecords = fetcher.fetchedRecords().get(tp);
assertEquals(3, consumerRecords.size());
- assertEquals(31L, (long) subscriptions.position(tp)); // this is the next fetching position
+ assertEquals(31L, subscriptions.position(tp).longValue()); // this is the next fetching position
assertEquals(15L, consumerRecords.get(0).offset());
assertEquals(20L, consumerRecords.get(1).offset());
@@ -318,12 +396,28 @@ public class FetcherTest {
fetcher.sendFetches();
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
consumerClient.poll(0);
- assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(0, fetcher.fetchedRecords().size());
+ assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(null, subscriptions.position(tp));
}
@Test
+ public void testStaleOutOfRangeError() {
+ // verify that an out of range error which arrives after a seek
+ // does not cause us to reset our position or throw an exception
+ subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.seek(tp, 0);
+
+ fetcher.sendFetches();
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+ subscriptions.seek(tp, 1);
+ consumerClient.poll(0);
+ assertEquals(0, fetcher.fetchedRecords().size());
+ assertFalse(subscriptions.isOffsetResetNeeded(tp));
+ assertEquals(1, subscriptions.position(tp).longValue());
+ }
+
+ @Test
public void testFetchedRecordsAfterSeek() {
subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
subscriptionsNoAutoReset.seek(tp, 0);
@@ -368,7 +462,7 @@ public class FetcherTest {
// disconnects should have no affect on subscription state
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
- assertEquals(0, (long) subscriptions.position(tp));
+ assertEquals(0, subscriptions.position(tp).longValue());
}
@Test
@@ -380,7 +474,7 @@ public class FetcherTest {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertTrue(subscriptions.isFetchable(tp));
- assertEquals(5, (long) subscriptions.position(tp));
+ assertEquals(5, subscriptions.position(tp).longValue());
}
@Test
@@ -393,7 +487,7 @@ public class FetcherTest {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
- assertEquals(5, (long) subscriptions.position(tp));
+ assertEquals(5, subscriptions.position(tp).longValue());
}
@Test
@@ -406,7 +500,7 @@ public class FetcherTest {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
- assertEquals(5, (long) subscriptions.position(tp));
+ assertEquals(5, subscriptions.position(tp).longValue());
}
@Test
@@ -419,7 +513,7 @@ public class FetcherTest {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
- assertEquals(5, (long) subscriptions.position(tp));
+ assertEquals(5, subscriptions.position(tp).longValue());
}
@Test
@@ -437,7 +531,7 @@ public class FetcherTest {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
- assertEquals(5, (long) subscriptions.position(tp));
+ assertEquals(5, subscriptions.position(tp).longValue());
}
@Test
@@ -575,17 +669,36 @@ public class FetcherTest {
return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
}
- private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords,
- SubscriptionState subscriptions,
- Metrics metrics) {
+ private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions,
+ Metrics metrics,
+ int maxPollRecords) {
+ return createFetcher(subscriptions, metrics, new ByteArrayDeserializer(), new ByteArrayDeserializer(), maxPollRecords);
+ }
+
+ private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
+ return createFetcher(subscriptions, metrics, Integer.MAX_VALUE);
+ }
+
+ private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions,
+ Metrics metrics,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer) {
+ return createFetcher(subscriptions, metrics, keyDeserializer, valueDeserializer, Integer.MAX_VALUE);
+ }
+
+ private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions,
+ Metrics metrics,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer,
+ int maxPollRecords) {
return new Fetcher<>(consumerClient,
minBytes,
maxWaitMs,
fetchSize,
maxPollRecords,
true, // check crc
- new ByteArrayDeserializer(),
- new ByteArrayDeserializer(),
+ keyDeserializer,
+ valueDeserializer,
metadata,
subscriptions,
metrics,
@@ -594,8 +707,4 @@ public class FetcherTest {
retryBackoffMs);
}
-
- private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
- return createFetcher(Integer.MAX_VALUE, subscriptions, metrics);
- }
}