You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "dengkai (JIRA)" <ji...@apache.org> on 2017/01/18 03:48:26 UTC
[jira] [Created] (FLUME-3043) KafkaSink SinkCallback throw
NullPointerException when Log4J leval is debug
dengkai created FLUME-3043:
------------------------------
Summary: KafkaSink SinkCallback throw NullPointerException when Log4J leval is debug
Key: FLUME-3043
URL: https://issues.apache.org/jira/browse/FLUME-3043
Project: Flume
Issue Type: Bug
Components: Sinks+Sources
Affects Versions: v1.7.0
Reporter: dengkai
Assignee: dengkai
Fix For: v1.8.0
When we send a event with SinkCallback to kafka under DEBUG level of log4j,
if kafka reponses a result with exception but not a RecordMetadata.
Kafka Sink will throw NPE.
code in SinkCallback:
if (logger.isDebugEnabled()) {
long eventElapsedTime = System.currentTimeMillis() - startTime;
logger.debug("Acked message partition:{} ofset:{}", metadata.partition(), metadata.offset());
logger.debug("Elapsed time for send: {}", eventElapsedTime);
}
code in Kafka Producer:
if (exception == null) {
// If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used.
RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset(),
timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,
thunk.future.checksum(),
thunk.future.serializedKeySize(),
thunk.future.serializedValueSize());
thunk.callback.onCompletion(metadata, null);
} else {
thunk.callback.onCompletion(null, exception);
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)