You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "dengkai (JIRA)" <ji...@apache.org> on 2017/01/18 03:48:26 UTC

[jira] [Created] (FLUME-3043) KafkaSink SinkCallback throw NullPointerException when Log4J leval is debug

dengkai created FLUME-3043:
------------------------------

             Summary: KafkaSink SinkCallback throw NullPointerException when Log4J leval is debug
                 Key: FLUME-3043
                 URL: https://issues.apache.org/jira/browse/FLUME-3043
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: v1.7.0
            Reporter: dengkai
            Assignee: dengkai
             Fix For: v1.8.0


When we send a event with SinkCallback to kafka under DEBUG level of log4j, 
if kafka reponses a result with exception but not a RecordMetadata.
Kafka Sink will throw NPE.
code in SinkCallback:
if (logger.isDebugEnabled()) {
      long eventElapsedTime = System.currentTimeMillis() - startTime;
      logger.debug("Acked message partition:{} ofset:{}",  metadata.partition(), metadata.offset());
      logger.debug("Elapsed time for send: {}", eventElapsedTime);
}
code in Kafka Producer:
if (exception == null) {
                    // If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used.
                    RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset(),
                                                                 timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,
                                                                 thunk.future.checksum(),
                                                                 thunk.future.serializedKeySize(),
                                                                 thunk.future.serializedValueSize());
                    thunk.callback.onCompletion(metadata, null);
                } else {
                    thunk.callback.onCompletion(null, exception);
                }




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)