You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@logging.apache.org by "Gary Gregory (Jira)" <ji...@apache.org> on 2019/08/21 20:35:00 UTC

[jira] [Commented] (LOG4J2-2678) Add LogEvent timestamp to ProducerRecord in KafkaAppender

    [ https://issues.apache.org/jira/browse/LOG4J2-2678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16912661#comment-16912661 ] 

Gary Gregory commented on LOG4J2-2678:
--------------------------------------

Hi [~fedexist],

The best way about this is to create a PR on GitHub.

> Add LogEvent timestamp to ProducerRecord in KafkaAppender
> ---------------------------------------------------------
>
>                 Key: LOG4J2-2678
>                 URL: https://issues.apache.org/jira/browse/LOG4J2-2678
>             Project: Log4j 2
>          Issue Type: New Feature
>          Components: Appenders
>            Reporter: Federico D'Ambrosio
>            Priority: Minor
>
> Just like I anticipated in the Mailing List, I'm creating this issue to add the possibility to send the LogEvent time as a timestamp for the record when using the log4j KafkaAppender.
> As far as I've seen in the source code, the message is sent by KafkaManager:
> {code:java}
> private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
>     final Layout<? extends Serializable> layout = getLayout();
>     byte[] data;
>     if (layout instanceof SerializedLayout) {
>        final byte[] header = layout.getHeader();
>        final byte[] body = layout.toByteArray(event);
>        data = new byte[header.length + body.length];
>        System.arraycopy(header, 0, data, 0, header.length);
>        System.arraycopy(body, 0, data, header.length, body.length);
>     } else {
>      data = layout.toByteArray(event);
>    }
>   manager.send(data); //manager.send(data, event.getTimeMillis())
> }
> {code}
> with manager.send() implemented this way, with highlighted the creation of the ProducerRecord:
> {code:java}
> public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
>     if (producer != null) {
>         byte[] newKey = null;
>         if(key != null && key.contains("${")) {
>              newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
>         } else if (key != null) {
>              newKey = key.getBytes(StandardCharsets.UTF_8);
>         }
>     final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
>     if (syncSend) {
>         final Future<RecordMetadata> response = producer.send(newRecord);
>         response.get(timeoutMillis, TimeUnit.MILLISECONDS);
>     } else {
>         producer.send(newRecord, new Callback() {
>              @Override
>              public void onCompletion(final RecordMetadata metadata, final Exception e) {
>                  if (e != null) {
>                      LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
>              }
>          }});
>    }
> }
> }
> {code}
> Now, ProducerRecord has the additional parameters, in particular, I'm looking at: [this constructor|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V- ]
> {code:java}
> public ProducerRecord(java.lang.String topic,
>                       java.lang.Integer partition,
>                       java.lang.Long timestamp,
>                       K key,
>                       V value)
> {code}
> which would allow us to set the timestamp as {{LogEvent#getTimeMillis()}}, but would force us to also input the partition where the record should be sent. Still, the logic behind the partitioning within the KafkaProducer is so that if {{partition}} is null, then the defined partitioner will be used (DefaultPartitioner or the one defined by the '{{partitioner.class}}' property), so, we could simply assign it as null.
> In terms of interface, we could add a single flag in the KafkaAppender definition, something like:
> {code:xml}
> <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
> {code}
> If the 'timestamp' flag is false, then the record would be sent with the timestamp parameter of the ProducerRecord as null, leaving the behaviour as it is right now.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)