You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@logging.apache.org by Federico D'Ambrosio <fe...@gmail.com> on 2019/08/21 07:31:51 UTC

Send LogEvent time as timestamp to Kafka Producer when using KafkaAppender

Hello everyone,

I wanted to discuss with you if it's possible or if you would consider
useful adding the possibility to send the LogEvent time as a timestamp for
the record when using the log4j KafkaAppender. I think it could be very
useful for everyone using Kafka as a log aggregator having the possibility
to use the event time, rather than the time the record is being sent.
Bear with me, I've just started looking at the souce code of KafkaAppender
and may overlook something in the broader scope of log4j.

As far as I've seen in the source code, the message is sent by KafkaManager:

146    private void tryAppend(final LogEvent event) throws
ExecutionException, InterruptedException, TimeoutException {147
final Layout<? extends Serializable> layout = getLayout();148
byte[] data;149        if (layout instanceof SerializedLayout) {150
        final byte[] header = layout.getHeader();151            final
byte[] body = layout.toByteArray(event);152            data = new
byte[header.length + body.length];153
System.arraycopy(header, 0, data, 0, header.length);154
System.arraycopy(body, 0, data, header.length, body.length);155
} else {156            data = layout.toByteArray(event);157
}*158        manager.send(data);*159    }

with manager.send() implemented this way, with highlighted the creation of
the ProducerRecord:

108    public void send(final byte[] msg) throws ExecutionException,
InterruptedException, TimeoutException {109        if (producer !=
null) {110            byte[] newKey = null;111112            if(key !=
null && key.contains("${")) {113                newKey =
getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
           } else if (key != null) {115                newKey =
key.getBytes(StandardCharsets.UTF_8);116            }117*118
 final ProducerRecord<byte[], byte[]> newRecord = new
ProducerRecord<>(topic, newKey, msg);*119            if (syncSend)
{120                final Future<RecordMetadata> response =
producer.send(newRecord);121
response.get(timeoutMillis, TimeUnit.MILLISECONDS);122            }
else {123                producer.send(newRecord, new Callback() {124
                  @Override125                    public void
onCompletion(final RecordMetadata metadata, final Exception e) {126
                    if (e != null) {127
LOGGER.error("Unable to write to Kafka in appender [" + getName() +
"]", e);128                        }129                    }130
        });131            }132        }133    }


Now, ProducerRecord has the additional parameters, in particular, I'm
looking at:
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V->

public ProducerRecord(java.lang.String topic,
                      java.lang.Integer partition,
                      java.lang.Long timestamp,
                      K
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
key,
                      V
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
value)

which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
but would force us to also input the partition where the record should be
sent. Still, the logic behind the partitioning within the KafkaProducer is
so that if partition is null, then the defined partitioner will be used
(DefaultPartitioner or the one defined by the 'partitioner.class'
property), so, we could simply assign it as null.

In terms of interface, we could add a single flag in the KafkaAppender
definition, something like:

<Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>

If the 'timestamp' flag is false, then the record would be sent with the
timestamp parameter of the ProducerRecord as null, leaving the behaviour as
it is right now.

What do you think about that? Was this something which was already
discussed?

Thank you for your attention,
Federico

Re: Send LogEvent time as timestamp to Kafka Producer when using KafkaAppender

Posted by Federico D'Ambrosio <fe...@gmail.com>.
Hi, to follow up, I've created the JIRA here:
https://issues.apache.org/jira/browse/LOG4J2-2678
and implemented the changes on my branch.
Still, I'm having troubles running the tests, is there any resource I can
look up to see what's the correct way to add a new test?

I'm asking because I've noticed that KafkaAppenderTest is referencing a
file named KafkaAppenderTest.xml, containing a log4j configuration, so I
added a new KafkaAppender to that file:

<Configuration name="KafkaAppenderTest" status="OFF">
  <Appenders>
    <Kafka name="KafkaAppenderWithTimestamp" topic="kafka-topic" key="key">
      <PatternLayout pattern="%m"/>
      <Property name="timeout.ms">1000</Property>
      <Property name="bootstrap.servers">localhost:9092</Property>
    </Kafka>
  </Appenders>
  <Loggers>
    <Root level="info">
      <AppenderRef ref="KafkaAppenderWithTimestamp"/>
    </Root>
  </Loggers>
</Configuration>

but apparently I've done something wrong, because when I run the test I've
added (which uses ctx.getRequiredAppender("KafkaAppenderWithTimestamp") ),
I'm getting the error:

Appender named KafkaAppenderWithTimestamp was null.

So, evidently, I'm missing something here.

Thank you very much,
Federico

Il giorno mer 21 ago 2019 alle ore 20:34 Federico D'Ambrosio <
fedexist@gmail.com> ha scritto:

> Hi Ralph,
>
> thank you for your response, I'll be creating the issue shortly.
>
>
>
> Il giorno mer 21 ago 2019 alle ore 13:37 Apache <
> ralph.goers@dslextreme.com> ha scritto:
>
>> It hasn’t been discussed. Feel free to create a Jira issue and a pull
>> request. Please make sure you include a test for your change.
>>
>> Ralph
>>
>> > On Aug 21, 2019, at 12:31 AM, Federico D'Ambrosio <fe...@gmail.com>
>> wrote:
>> >
>> > Hello everyone,
>> >
>> > I wanted to discuss with you if it's possible or if you would consider
>> > useful adding the possibility to send the LogEvent time as a timestamp
>> for
>> > the record when using the log4j KafkaAppender. I think it could be very
>> > useful for everyone using Kafka as a log aggregator having the
>> possibility
>> > to use the event time, rather than the time the record is being sent.
>> > Bear with me, I've just started looking at the souce code of
>> KafkaAppender
>> > and may overlook something in the broader scope of log4j.
>> >
>> > As far as I've seen in the source code, the message is sent by
>> KafkaManager:
>> >
>> > 146    private void tryAppend(final LogEvent event) throws
>> > ExecutionException, InterruptedException, TimeoutException {147
>> > final Layout<? extends Serializable> layout = getLayout();148
>> > byte[] data;149        if (layout instanceof SerializedLayout) {150
>> >        final byte[] header = layout.getHeader();151            final
>> > byte[] body = layout.toByteArray(event);152            data = new
>> > byte[header.length + body.length];153
>> > System.arraycopy(header, 0, data, 0, header.length);154
>> > System.arraycopy(body, 0, data, header.length, body.length);155
>> > } else {156            data = layout.toByteArray(event);157
>> > }*158        manager.send(data);*159    }
>> >
>> > with manager.send() implemented this way, with highlighted the creation
>> of
>> > the ProducerRecord:
>> >
>> > 108    public void send(final byte[] msg) throws ExecutionException,
>> > InterruptedException, TimeoutException {109        if (producer !=
>> > null) {110            byte[] newKey = null;111112            if(key !=
>> > null && key.contains("${")) {113                newKey =
>> >
>> getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
>> >           } else if (key != null) {115                newKey =
>> > key.getBytes(StandardCharsets.UTF_8);116            }117*118
>> > final ProducerRecord<byte[], byte[]> newRecord = new
>> > ProducerRecord<>(topic, newKey, msg);*119            if (syncSend)
>> > {120                final Future<RecordMetadata> response =
>> > producer.send(newRecord);121
>> > response.get(timeoutMillis, TimeUnit.MILLISECONDS);122            }
>> > else {123                producer.send(newRecord, new Callback() {124
>> >                  @Override125                    public void
>> > onCompletion(final RecordMetadata metadata, final Exception e) {126
>> >                    if (e != null) {127
>> > LOGGER.error("Unable to write to Kafka in appender [" + getName() +
>> > "]", e);128                        }129                    }130
>> >        });131            }132        }133    }
>> >
>> >
>> > Now, ProducerRecord has the additional parameters, in particular, I'm
>> > looking at:
>> >
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
>> > <
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
>> >
>> >
>> > public ProducerRecord(java.lang.String topic,
>> >                      java.lang.Integer partition,
>> >                      java.lang.Long timestamp,
>> >                      K
>> > <
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
>> >
>> > key,
>> >                      V
>> > <
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
>> >
>> > value)
>> >
>> > which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
>> > but would force us to also input the partition where the record should
>> be
>> > sent. Still, the logic behind the partitioning within the KafkaProducer
>> is
>> > so that if partition is null, then the defined partitioner will be used
>> > (DefaultPartitioner or the one defined by the 'partitioner.class'
>> > property), so, we could simply assign it as null.
>> >
>> > In terms of interface, we could add a single flag in the KafkaAppender
>> > definition, something like:
>> >
>> > <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
>> >
>> > If the 'timestamp' flag is false, then the record would be sent with the
>> > timestamp parameter of the ProducerRecord as null, leaving the
>> behaviour as
>> > it is right now.
>> >
>> > What do you think about that? Was this something which was already
>> > discussed?
>> >
>> > Thank you for your attention,
>> > Federico
>>
>>
>>
>
> --
> Federico D'Ambrosio
>


-- 
Federico D'Ambrosio

Re: Send LogEvent time as timestamp to Kafka Producer when using KafkaAppender

Posted by Federico D'Ambrosio <fe...@gmail.com>.
Hi Ralph,

thank you for your response, I'll be creating the issue shortly.



Il giorno mer 21 ago 2019 alle ore 13:37 Apache <ra...@dslextreme.com>
ha scritto:

> It hasn’t been discussed. Feel free to create a Jira issue and a pull
> request. Please make sure you include a test for your change.
>
> Ralph
>
> > On Aug 21, 2019, at 12:31 AM, Federico D'Ambrosio <fe...@gmail.com>
> wrote:
> >
> > Hello everyone,
> >
> > I wanted to discuss with you if it's possible or if you would consider
> > useful adding the possibility to send the LogEvent time as a timestamp
> for
> > the record when using the log4j KafkaAppender. I think it could be very
> > useful for everyone using Kafka as a log aggregator having the
> possibility
> > to use the event time, rather than the time the record is being sent.
> > Bear with me, I've just started looking at the souce code of
> KafkaAppender
> > and may overlook something in the broader scope of log4j.
> >
> > As far as I've seen in the source code, the message is sent by
> KafkaManager:
> >
> > 146    private void tryAppend(final LogEvent event) throws
> > ExecutionException, InterruptedException, TimeoutException {147
> > final Layout<? extends Serializable> layout = getLayout();148
> > byte[] data;149        if (layout instanceof SerializedLayout) {150
> >        final byte[] header = layout.getHeader();151            final
> > byte[] body = layout.toByteArray(event);152            data = new
> > byte[header.length + body.length];153
> > System.arraycopy(header, 0, data, 0, header.length);154
> > System.arraycopy(body, 0, data, header.length, body.length);155
> > } else {156            data = layout.toByteArray(event);157
> > }*158        manager.send(data);*159    }
> >
> > with manager.send() implemented this way, with highlighted the creation
> of
> > the ProducerRecord:
> >
> > 108    public void send(final byte[] msg) throws ExecutionException,
> > InterruptedException, TimeoutException {109        if (producer !=
> > null) {110            byte[] newKey = null;111112            if(key !=
> > null && key.contains("${")) {113                newKey =
> >
> getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
> >           } else if (key != null) {115                newKey =
> > key.getBytes(StandardCharsets.UTF_8);116            }117*118
> > final ProducerRecord<byte[], byte[]> newRecord = new
> > ProducerRecord<>(topic, newKey, msg);*119            if (syncSend)
> > {120                final Future<RecordMetadata> response =
> > producer.send(newRecord);121
> > response.get(timeoutMillis, TimeUnit.MILLISECONDS);122            }
> > else {123                producer.send(newRecord, new Callback() {124
> >                  @Override125                    public void
> > onCompletion(final RecordMetadata metadata, final Exception e) {126
> >                    if (e != null) {127
> > LOGGER.error("Unable to write to Kafka in appender [" + getName() +
> > "]", e);128                        }129                    }130
> >        });131            }132        }133    }
> >
> >
> > Now, ProducerRecord has the additional parameters, in particular, I'm
> > looking at:
> >
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
> > <
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
> >
> >
> > public ProducerRecord(java.lang.String topic,
> >                      java.lang.Integer partition,
> >                      java.lang.Long timestamp,
> >                      K
> > <
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > key,
> >                      V
> > <
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > value)
> >
> > which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
> > but would force us to also input the partition where the record should be
> > sent. Still, the logic behind the partitioning within the KafkaProducer
> is
> > so that if partition is null, then the defined partitioner will be used
> > (DefaultPartitioner or the one defined by the 'partitioner.class'
> > property), so, we could simply assign it as null.
> >
> > In terms of interface, we could add a single flag in the KafkaAppender
> > definition, something like:
> >
> > <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
> >
> > If the 'timestamp' flag is false, then the record would be sent with the
> > timestamp parameter of the ProducerRecord as null, leaving the behaviour
> as
> > it is right now.
> >
> > What do you think about that? Was this something which was already
> > discussed?
> >
> > Thank you for your attention,
> > Federico
>
>
>

-- 
Federico D'Ambrosio

Re: Send LogEvent time as timestamp to Kafka Producer when using KafkaAppender

Posted by Apache <ra...@dslextreme.com>.
It hasn’t been discussed. Feel free to create a Jira issue and a pull request. Please make sure you include a test for your change. 

Ralph

> On Aug 21, 2019, at 12:31 AM, Federico D'Ambrosio <fe...@gmail.com> wrote:
> 
> Hello everyone,
> 
> I wanted to discuss with you if it's possible or if you would consider
> useful adding the possibility to send the LogEvent time as a timestamp for
> the record when using the log4j KafkaAppender. I think it could be very
> useful for everyone using Kafka as a log aggregator having the possibility
> to use the event time, rather than the time the record is being sent.
> Bear with me, I've just started looking at the souce code of KafkaAppender
> and may overlook something in the broader scope of log4j.
> 
> As far as I've seen in the source code, the message is sent by KafkaManager:
> 
> 146    private void tryAppend(final LogEvent event) throws
> ExecutionException, InterruptedException, TimeoutException {147
> final Layout<? extends Serializable> layout = getLayout();148
> byte[] data;149        if (layout instanceof SerializedLayout) {150
>        final byte[] header = layout.getHeader();151            final
> byte[] body = layout.toByteArray(event);152            data = new
> byte[header.length + body.length];153
> System.arraycopy(header, 0, data, 0, header.length);154
> System.arraycopy(body, 0, data, header.length, body.length);155
> } else {156            data = layout.toByteArray(event);157
> }*158        manager.send(data);*159    }
> 
> with manager.send() implemented this way, with highlighted the creation of
> the ProducerRecord:
> 
> 108    public void send(final byte[] msg) throws ExecutionException,
> InterruptedException, TimeoutException {109        if (producer !=
> null) {110            byte[] newKey = null;111112            if(key !=
> null && key.contains("${")) {113                newKey =
> getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
>           } else if (key != null) {115                newKey =
> key.getBytes(StandardCharsets.UTF_8);116            }117*118
> final ProducerRecord<byte[], byte[]> newRecord = new
> ProducerRecord<>(topic, newKey, msg);*119            if (syncSend)
> {120                final Future<RecordMetadata> response =
> producer.send(newRecord);121
> response.get(timeoutMillis, TimeUnit.MILLISECONDS);122            }
> else {123                producer.send(newRecord, new Callback() {124
>                  @Override125                    public void
> onCompletion(final RecordMetadata metadata, final Exception e) {126
>                    if (e != null) {127
> LOGGER.error("Unable to write to Kafka in appender [" + getName() +
> "]", e);128                        }129                    }130
>        });131            }132        }133    }
> 
> 
> Now, ProducerRecord has the additional parameters, in particular, I'm
> looking at:
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
> <https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V->
> 
> public ProducerRecord(java.lang.String topic,
>                      java.lang.Integer partition,
>                      java.lang.Long timestamp,
>                      K
> <https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
> key,
>                      V
> <https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
> value)
> 
> which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
> but would force us to also input the partition where the record should be
> sent. Still, the logic behind the partitioning within the KafkaProducer is
> so that if partition is null, then the defined partitioner will be used
> (DefaultPartitioner or the one defined by the 'partitioner.class'
> property), so, we could simply assign it as null.
> 
> In terms of interface, we could add a single flag in the KafkaAppender
> definition, something like:
> 
> <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
> 
> If the 'timestamp' flag is false, then the record would be sent with the
> timestamp parameter of the ProducerRecord as null, leaving the behaviour as
> it is right now.
> 
> What do you think about that? Was this something which was already
> discussed?
> 
> Thank you for your attention,
> Federico