You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Julian Reichinger (Jira)" <ji...@apache.org> on 2021/09/08 13:54:00 UTC
[jira] [Created] (KAFKA-13278) Deserialization behavior of the
Fetcher class does not match up with API contract of the Deserializer
interface
Julian Reichinger created KAFKA-13278:
-----------------------------------------
Summary: Deserialization behavior of the Fetcher class does not match up with API contract of the Deserializer interface
Key: KAFKA-13278
URL: https://issues.apache.org/jira/browse/KAFKA-13278
Project: Kafka
Issue Type: Bug
Components: clients, documentation
Affects Versions: 2.6.0
Reporter: Julian Reichinger
The documentation of the
{noformat}
org.apache.kafka.common.serialization.Deserializer{noformat}
interface states that implementations have to expect null byte-arrays and should handle them in a meaningful way.
However, at least in the kafka client it seems to be impossible to actually get a null value into a deserializer because the class
{noformat}
org.apache.kafka.clients.consumer.internals.Fetcher{noformat}
does not call the registered deserializer in case of a null value.
{code:java}
private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
RecordBatch batch,
Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
TimestampType timestampType = batch.timestampType();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType, record.checksumOrNull(),
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
key, value, headers, leaderEpoch);
} catch (RuntimeException e) {
throw new SerializationException("Error deserializing key/value for partition " + partition +
" at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
}
}
{code}
I implemented an ErrorHandlingDeserializer which I use to wrap the actual deserializers and which records the result (value or exception) in a container object.
{code:java}
/**
* Handles exceptions during de-serializations thrown by a delegate {@link Deserializer}.
*
* @param <T> type of the deserialized object
*/
final class ErrorHandlingDeserializer<T> implements Deserializer<ReadResult<T>> {
private final Deserializer<Envelope<T>> delegate;
private ErrorHandlingDeserializer(Deserializer<Envelope<T>> delegate) {
this.delegate = requireNonNull(delegate);
}
static <T> ErrorHandlingDeserializer<T> wrap(Deserializer<Envelope<T>> delegate) {
return new ErrorHandlingDeserializer<>(delegate);
}
@Override
public ReadResult<T> deserialize(String topic, @Nullable byte[] data) {
try {
return ReadResult.successful(delegate.deserialize(topic, data));
} catch (Exception e) {
return ReadResult.failed(e);
}
}
}
{code}
This deserializer cannot produce a null value. However, because of the Fetcher behavior I still have to check for null values in the consumer records at every usage and additionally I also have to check for a null value inside the ReadResult container class, because the Deserializer API says so and I have no guarantee that the Fetcher behavior will never change.
In my opinion this behavior is a bug, because everyone implementing a Deserializer would expect to actually receive null values (for example in case of deletions). There should either be a guarantee on the client side that Deserializers always receive null values or that they never receive null values.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)