You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Chia-Ping Tsai (JIRA)" <ji...@apache.org> on 2018/06/11 10:38:00 UTC

[jira] [Created] (KAFKA-7036) Complete the docs of KafkaConsumer#poll

Chia-Ping Tsai created KAFKA-7036:
-------------------------------------

             Summary: Complete the docs of KafkaConsumer#poll
                 Key: KAFKA-7036
                 URL: https://issues.apache.org/jira/browse/KAFKA-7036
             Project: Kafka
          Issue Type: Improvement
            Reporter: Chia-Ping Tsai
            Assignee: Chia-Ping Tsai


KafkaConsumer#poll has a nice docs about the expected exceptions. However, it lacks the description of SerializationException. Another mirror issue is that KafkaConsumer doesn't catch all type of exception which may be thrown by deserializer (see below). We should use Throwable to replace the RuntimeException so as to catch all exception and then wrap them to SerializationException.
{code:java}
private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                         RecordBatch batch,
                                         Record record) {
    try {
        long offset = record.offset();
        long timestamp = record.timestamp();
        TimestampType timestampType = batch.timestampType();
        Headers headers = new RecordHeaders(record.headers());
        ByteBuffer keyBytes = record.key();
        byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
        K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
        ByteBuffer valueBytes = record.value();
        byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
        V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
        return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
                                    timestamp, timestampType, record.checksumOrNull(),
                                    keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
                                    valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
                                    key, value, headers);
    } catch (RuntimeException e) {
        throw new SerializationException("Error deserializing key/value for partition " + partition +
                " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
    }
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)