You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@logging.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/12/08 06:32:00 UTC

[jira] [Work logged] (LOG4J2-2678) Add LogEvent timestamp to ProducerRecord in KafkaAppender

     [ https://issues.apache.org/jira/browse/LOG4J2-2678?focusedWorklogId=355760&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355760 ]

ASF GitHub Bot logged work on LOG4J2-2678:
------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Dec/19 06:31
            Start Date: 08/Dec/19 06:31
    Worklog Time Spent: 10m 
      Work Description: rgoers commented on issue #306: [LOG4J2-2678] Add LogEvent timestamp to Kafka ProducerRecord
URL: https://github.com/apache/logging-log4j2/pull/306#issuecomment-562917363
 
 
   I applied this to master and when I ran the test I get
   ```
   [INFO] Running org.apache.logging.log4j.kafka.appender.KafkaAppenderTest
   [ERROR] Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.442 s <<< FAILURE! - in org.apache.logging.log4j.kafka.appender.KafkaAppenderTest
   [ERROR] testAppenderNoEventTimestamp(org.apache.logging.log4j.kafka.appender.KafkaAppenderTest)  Time elapsed: 0.025 s  <<< FAILURE!
   java.lang.AssertionError: Appender named KafkaAppenderNoEventTimestamp was null.
   	at org.apache.logging.log4j.kafka.appender.KafkaAppenderTest.testAppenderNoEventTimestamp(KafkaAppenderTest.java:155)
   
   [INFO] 
   [INFO] Results:
   [INFO] 
   [ERROR] Failures: 
   [ERROR]   KafkaAppenderTest.testAppenderNoEventTimestamp:155 Appender named KafkaAppenderNoEventTimestamp was null.
   [INFO] 
   [ERROR] Tests run: 5, Failures: 1, Errors: 0, Skipped: 0
   
   ```
   Also, if you would like this as part of a Log4j 2.x release please provide an equivalent patch to the release-2.x branch.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 355760)
    Time Spent: 2.5h  (was: 2h 20m)

> Add LogEvent timestamp to ProducerRecord in KafkaAppender
> ---------------------------------------------------------
>
>                 Key: LOG4J2-2678
>                 URL: https://issues.apache.org/jira/browse/LOG4J2-2678
>             Project: Log4j 2
>          Issue Type: New Feature
>          Components: Appenders
>            Reporter: Federico D'Ambrosio
>            Priority: Minor
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Just like I anticipated in the Mailing List, I'm creating this issue to add the possibility to send the LogEvent time as a timestamp for the record when using the log4j KafkaAppender.
> As far as I've seen in the source code, the message is sent by KafkaManager:
> {code:java}
> private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
>     final Layout<? extends Serializable> layout = getLayout();
>     byte[] data;
>     if (layout instanceof SerializedLayout) {
>        final byte[] header = layout.getHeader();
>        final byte[] body = layout.toByteArray(event);
>        data = new byte[header.length + body.length];
>        System.arraycopy(header, 0, data, 0, header.length);
>        System.arraycopy(body, 0, data, header.length, body.length);
>     } else {
>      data = layout.toByteArray(event);
>    }
>   manager.send(data); //manager.send(data, event.getTimeMillis())
> }
> {code}
> with manager.send() implemented this way, with highlighted the creation of the ProducerRecord:
> {code:java}
> public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
>     if (producer != null) {
>         byte[] newKey = null;
>         if(key != null && key.contains("${")) {
>              newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
>         } else if (key != null) {
>              newKey = key.getBytes(StandardCharsets.UTF_8);
>         }
>     final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
>     if (syncSend) {
>         final Future<RecordMetadata> response = producer.send(newRecord);
>         response.get(timeoutMillis, TimeUnit.MILLISECONDS);
>     } else {
>         producer.send(newRecord, new Callback() {
>              @Override
>              public void onCompletion(final RecordMetadata metadata, final Exception e) {
>                  if (e != null) {
>                      LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
>              }
>          }});
>    }
> }
> }
> {code}
> Now, ProducerRecord has the additional parameters, in particular, I'm looking at: [this constructor|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V- ]
> {code:java}
> public ProducerRecord(java.lang.String topic,
>                       java.lang.Integer partition,
>                       java.lang.Long timestamp,
>                       K key,
>                       V value)
> {code}
> which would allow us to set the timestamp as {{LogEvent#getTimeMillis()}}, but would force us to also input the partition where the record should be sent. Still, the logic behind the partitioning within the KafkaProducer is so that if {{partition}} is null, then the defined partitioner will be used (DefaultPartitioner or the one defined by the '{{partitioner.class}}' property), so, we could simply assign it as null.
> In terms of interface, we could add a single flag in the KafkaAppender definition, something like:
> {code:xml}
> <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
> {code}
> If the 'timestamp' flag is false, then the record would be sent with the timestamp parameter of the ProducerRecord as null, leaving the behaviour as it is right now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)