You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Chia-Ping Tsai (JIRA)" <ji...@apache.org> on 2018/06/11 10:38:00 UTC
[jira] [Created] (KAFKA-7036) Complete the docs of
KafkaConsumer#poll
Chia-Ping Tsai created KAFKA-7036:
-------------------------------------
Summary: Complete the docs of KafkaConsumer#poll
Key: KAFKA-7036
URL: https://issues.apache.org/jira/browse/KAFKA-7036
Project: Kafka
Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai
KafkaConsumer#poll has a nice docs about the expected exceptions. However, it lacks the description of SerializationException. Another mirror issue is that KafkaConsumer doesn't catch all type of exception which may be thrown by deserializer (see below). We should use Throwable to replace the RuntimeException so as to catch all exception and then wrap them to SerializationException.
{code:java}
private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
RecordBatch batch,
Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
TimestampType timestampType = batch.timestampType();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType, record.checksumOrNull(),
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
key, value, headers);
} catch (RuntimeException e) {
throw new SerializationException("Error deserializing key/value for partition " + partition +
" at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
}
}{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)