You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@logging.apache.org by Federico D'Ambrosio <fe...@gmail.com> on 2019/08/21 07:31:51 UTC
Send LogEvent time as timestamp to Kafka Producer when using KafkaAppender
Hello everyone,
I wanted to discuss with you if it's possible or if you would consider
useful adding the possibility to send the LogEvent time as a timestamp for
the record when using the log4j KafkaAppender. I think it could be very
useful for everyone using Kafka as a log aggregator having the possibility
to use the event time, rather than the time the record is being sent.
Bear with me, I've just started looking at the souce code of KafkaAppender
and may overlook something in the broader scope of log4j.
As far as I've seen in the source code, the message is sent by KafkaManager:
146 private void tryAppend(final LogEvent event) throws
ExecutionException, InterruptedException, TimeoutException {147
final Layout<? extends Serializable> layout = getLayout();148
byte[] data;149 if (layout instanceof SerializedLayout) {150
final byte[] header = layout.getHeader();151 final
byte[] body = layout.toByteArray(event);152 data = new
byte[header.length + body.length];153
System.arraycopy(header, 0, data, 0, header.length);154
System.arraycopy(body, 0, data, header.length, body.length);155
} else {156 data = layout.toByteArray(event);157
}*158 manager.send(data);*159 }
with manager.send() implemented this way, with highlighted the creation of
the ProducerRecord:
108 public void send(final byte[] msg) throws ExecutionException,
InterruptedException, TimeoutException {109 if (producer !=
null) {110 byte[] newKey = null;111112 if(key !=
null && key.contains("${")) {113 newKey =
getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
} else if (key != null) {115 newKey =
key.getBytes(StandardCharsets.UTF_8);116 }117*118
final ProducerRecord<byte[], byte[]> newRecord = new
ProducerRecord<>(topic, newKey, msg);*119 if (syncSend)
{120 final Future<RecordMetadata> response =
producer.send(newRecord);121
response.get(timeoutMillis, TimeUnit.MILLISECONDS);122 }
else {123 producer.send(newRecord, new Callback() {124
@Override125 public void
onCompletion(final RecordMetadata metadata, final Exception e) {126
if (e != null) {127
LOGGER.error("Unable to write to Kafka in appender [" + getName() +
"]", e);128 }129 }130
});131 }132 }133 }
Now, ProducerRecord has the additional parameters, in particular, I'm
looking at:
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V->
public ProducerRecord(java.lang.String topic,
java.lang.Integer partition,
java.lang.Long timestamp,
K
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
key,
V
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
value)
which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
but would force us to also input the partition where the record should be
sent. Still, the logic behind the partitioning within the KafkaProducer is
so that if partition is null, then the defined partitioner will be used
(DefaultPartitioner or the one defined by the 'partitioner.class'
property), so, we could simply assign it as null.
In terms of interface, we could add a single flag in the KafkaAppender
definition, something like:
<Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
If the 'timestamp' flag is false, then the record would be sent with the
timestamp parameter of the ProducerRecord as null, leaving the behaviour as
it is right now.
What do you think about that? Was this something which was already
discussed?
Thank you for your attention,
Federico
Re: Send LogEvent time as timestamp to Kafka Producer when using KafkaAppender
Posted by Federico D'Ambrosio <fe...@gmail.com>.
Hi, to follow up, I've created the JIRA here:
https://issues.apache.org/jira/browse/LOG4J2-2678
and implemented the changes on my branch.
Still, I'm having troubles running the tests, is there any resource I can
look up to see what's the correct way to add a new test?
I'm asking because I've noticed that KafkaAppenderTest is referencing a
file named KafkaAppenderTest.xml, containing a log4j configuration, so I
added a new KafkaAppender to that file:
<Configuration name="KafkaAppenderTest" status="OFF">
<Appenders>
<Kafka name="KafkaAppenderWithTimestamp" topic="kafka-topic" key="key">
<PatternLayout pattern="%m"/>
<Property name="timeout.ms">1000</Property>
<Property name="bootstrap.servers">localhost:9092</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="KafkaAppenderWithTimestamp"/>
</Root>
</Loggers>
</Configuration>
but apparently I've done something wrong, because when I run the test I've
added (which uses ctx.getRequiredAppender("KafkaAppenderWithTimestamp") ),
I'm getting the error:
Appender named KafkaAppenderWithTimestamp was null.
So, evidently, I'm missing something here.
Thank you very much,
Federico
Il giorno mer 21 ago 2019 alle ore 20:34 Federico D'Ambrosio <
fedexist@gmail.com> ha scritto:
> Hi Ralph,
>
> thank you for your response, I'll be creating the issue shortly.
>
>
>
> Il giorno mer 21 ago 2019 alle ore 13:37 Apache <
> ralph.goers@dslextreme.com> ha scritto:
>
>> It hasn’t been discussed. Feel free to create a Jira issue and a pull
>> request. Please make sure you include a test for your change.
>>
>> Ralph
>>
>> > On Aug 21, 2019, at 12:31 AM, Federico D'Ambrosio <fe...@gmail.com>
>> wrote:
>> >
>> > Hello everyone,
>> >
>> > I wanted to discuss with you if it's possible or if you would consider
>> > useful adding the possibility to send the LogEvent time as a timestamp
>> for
>> > the record when using the log4j KafkaAppender. I think it could be very
>> > useful for everyone using Kafka as a log aggregator having the
>> possibility
>> > to use the event time, rather than the time the record is being sent.
>> > Bear with me, I've just started looking at the souce code of
>> KafkaAppender
>> > and may overlook something in the broader scope of log4j.
>> >
>> > As far as I've seen in the source code, the message is sent by
>> KafkaManager:
>> >
>> > 146 private void tryAppend(final LogEvent event) throws
>> > ExecutionException, InterruptedException, TimeoutException {147
>> > final Layout<? extends Serializable> layout = getLayout();148
>> > byte[] data;149 if (layout instanceof SerializedLayout) {150
>> > final byte[] header = layout.getHeader();151 final
>> > byte[] body = layout.toByteArray(event);152 data = new
>> > byte[header.length + body.length];153
>> > System.arraycopy(header, 0, data, 0, header.length);154
>> > System.arraycopy(body, 0, data, header.length, body.length);155
>> > } else {156 data = layout.toByteArray(event);157
>> > }*158 manager.send(data);*159 }
>> >
>> > with manager.send() implemented this way, with highlighted the creation
>> of
>> > the ProducerRecord:
>> >
>> > 108 public void send(final byte[] msg) throws ExecutionException,
>> > InterruptedException, TimeoutException {109 if (producer !=
>> > null) {110 byte[] newKey = null;111112 if(key !=
>> > null && key.contains("${")) {113 newKey =
>> >
>> getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
>> > } else if (key != null) {115 newKey =
>> > key.getBytes(StandardCharsets.UTF_8);116 }117*118
>> > final ProducerRecord<byte[], byte[]> newRecord = new
>> > ProducerRecord<>(topic, newKey, msg);*119 if (syncSend)
>> > {120 final Future<RecordMetadata> response =
>> > producer.send(newRecord);121
>> > response.get(timeoutMillis, TimeUnit.MILLISECONDS);122 }
>> > else {123 producer.send(newRecord, new Callback() {124
>> > @Override125 public void
>> > onCompletion(final RecordMetadata metadata, final Exception e) {126
>> > if (e != null) {127
>> > LOGGER.error("Unable to write to Kafka in appender [" + getName() +
>> > "]", e);128 }129 }130
>> > });131 }132 }133 }
>> >
>> >
>> > Now, ProducerRecord has the additional parameters, in particular, I'm
>> > looking at:
>> >
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
>> > <
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
>> >
>> >
>> > public ProducerRecord(java.lang.String topic,
>> > java.lang.Integer partition,
>> > java.lang.Long timestamp,
>> > K
>> > <
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
>> >
>> > key,
>> > V
>> > <
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
>> >
>> > value)
>> >
>> > which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
>> > but would force us to also input the partition where the record should
>> be
>> > sent. Still, the logic behind the partitioning within the KafkaProducer
>> is
>> > so that if partition is null, then the defined partitioner will be used
>> > (DefaultPartitioner or the one defined by the 'partitioner.class'
>> > property), so, we could simply assign it as null.
>> >
>> > In terms of interface, we could add a single flag in the KafkaAppender
>> > definition, something like:
>> >
>> > <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
>> >
>> > If the 'timestamp' flag is false, then the record would be sent with the
>> > timestamp parameter of the ProducerRecord as null, leaving the
>> behaviour as
>> > it is right now.
>> >
>> > What do you think about that? Was this something which was already
>> > discussed?
>> >
>> > Thank you for your attention,
>> > Federico
>>
>>
>>
>
> --
> Federico D'Ambrosio
>
--
Federico D'Ambrosio
Re: Send LogEvent time as timestamp to Kafka Producer when using KafkaAppender
Posted by Federico D'Ambrosio <fe...@gmail.com>.
Hi Ralph,
thank you for your response, I'll be creating the issue shortly.
Il giorno mer 21 ago 2019 alle ore 13:37 Apache <ra...@dslextreme.com>
ha scritto:
> It hasn’t been discussed. Feel free to create a Jira issue and a pull
> request. Please make sure you include a test for your change.
>
> Ralph
>
> > On Aug 21, 2019, at 12:31 AM, Federico D'Ambrosio <fe...@gmail.com>
> wrote:
> >
> > Hello everyone,
> >
> > I wanted to discuss with you if it's possible or if you would consider
> > useful adding the possibility to send the LogEvent time as a timestamp
> for
> > the record when using the log4j KafkaAppender. I think it could be very
> > useful for everyone using Kafka as a log aggregator having the
> possibility
> > to use the event time, rather than the time the record is being sent.
> > Bear with me, I've just started looking at the souce code of
> KafkaAppender
> > and may overlook something in the broader scope of log4j.
> >
> > As far as I've seen in the source code, the message is sent by
> KafkaManager:
> >
> > 146 private void tryAppend(final LogEvent event) throws
> > ExecutionException, InterruptedException, TimeoutException {147
> > final Layout<? extends Serializable> layout = getLayout();148
> > byte[] data;149 if (layout instanceof SerializedLayout) {150
> > final byte[] header = layout.getHeader();151 final
> > byte[] body = layout.toByteArray(event);152 data = new
> > byte[header.length + body.length];153
> > System.arraycopy(header, 0, data, 0, header.length);154
> > System.arraycopy(body, 0, data, header.length, body.length);155
> > } else {156 data = layout.toByteArray(event);157
> > }*158 manager.send(data);*159 }
> >
> > with manager.send() implemented this way, with highlighted the creation
> of
> > the ProducerRecord:
> >
> > 108 public void send(final byte[] msg) throws ExecutionException,
> > InterruptedException, TimeoutException {109 if (producer !=
> > null) {110 byte[] newKey = null;111112 if(key !=
> > null && key.contains("${")) {113 newKey =
> >
> getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
> > } else if (key != null) {115 newKey =
> > key.getBytes(StandardCharsets.UTF_8);116 }117*118
> > final ProducerRecord<byte[], byte[]> newRecord = new
> > ProducerRecord<>(topic, newKey, msg);*119 if (syncSend)
> > {120 final Future<RecordMetadata> response =
> > producer.send(newRecord);121
> > response.get(timeoutMillis, TimeUnit.MILLISECONDS);122 }
> > else {123 producer.send(newRecord, new Callback() {124
> > @Override125 public void
> > onCompletion(final RecordMetadata metadata, final Exception e) {126
> > if (e != null) {127
> > LOGGER.error("Unable to write to Kafka in appender [" + getName() +
> > "]", e);128 }129 }130
> > });131 }132 }133 }
> >
> >
> > Now, ProducerRecord has the additional parameters, in particular, I'm
> > looking at:
> >
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
> > <
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
> >
> >
> > public ProducerRecord(java.lang.String topic,
> > java.lang.Integer partition,
> > java.lang.Long timestamp,
> > K
> > <
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > key,
> > V
> > <
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > value)
> >
> > which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
> > but would force us to also input the partition where the record should be
> > sent. Still, the logic behind the partitioning within the KafkaProducer
> is
> > so that if partition is null, then the defined partitioner will be used
> > (DefaultPartitioner or the one defined by the 'partitioner.class'
> > property), so, we could simply assign it as null.
> >
> > In terms of interface, we could add a single flag in the KafkaAppender
> > definition, something like:
> >
> > <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
> >
> > If the 'timestamp' flag is false, then the record would be sent with the
> > timestamp parameter of the ProducerRecord as null, leaving the behaviour
> as
> > it is right now.
> >
> > What do you think about that? Was this something which was already
> > discussed?
> >
> > Thank you for your attention,
> > Federico
>
>
>
--
Federico D'Ambrosio
Re: Send LogEvent time as timestamp to Kafka Producer when using KafkaAppender
Posted by Apache <ra...@dslextreme.com>.
It hasn’t been discussed. Feel free to create a Jira issue and a pull request. Please make sure you include a test for your change.
Ralph
> On Aug 21, 2019, at 12:31 AM, Federico D'Ambrosio <fe...@gmail.com> wrote:
>
> Hello everyone,
>
> I wanted to discuss with you if it's possible or if you would consider
> useful adding the possibility to send the LogEvent time as a timestamp for
> the record when using the log4j KafkaAppender. I think it could be very
> useful for everyone using Kafka as a log aggregator having the possibility
> to use the event time, rather than the time the record is being sent.
> Bear with me, I've just started looking at the souce code of KafkaAppender
> and may overlook something in the broader scope of log4j.
>
> As far as I've seen in the source code, the message is sent by KafkaManager:
>
> 146 private void tryAppend(final LogEvent event) throws
> ExecutionException, InterruptedException, TimeoutException {147
> final Layout<? extends Serializable> layout = getLayout();148
> byte[] data;149 if (layout instanceof SerializedLayout) {150
> final byte[] header = layout.getHeader();151 final
> byte[] body = layout.toByteArray(event);152 data = new
> byte[header.length + body.length];153
> System.arraycopy(header, 0, data, 0, header.length);154
> System.arraycopy(body, 0, data, header.length, body.length);155
> } else {156 data = layout.toByteArray(event);157
> }*158 manager.send(data);*159 }
>
> with manager.send() implemented this way, with highlighted the creation of
> the ProducerRecord:
>
> 108 public void send(final byte[] msg) throws ExecutionException,
> InterruptedException, TimeoutException {109 if (producer !=
> null) {110 byte[] newKey = null;111112 if(key !=
> null && key.contains("${")) {113 newKey =
> getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
> } else if (key != null) {115 newKey =
> key.getBytes(StandardCharsets.UTF_8);116 }117*118
> final ProducerRecord<byte[], byte[]> newRecord = new
> ProducerRecord<>(topic, newKey, msg);*119 if (syncSend)
> {120 final Future<RecordMetadata> response =
> producer.send(newRecord);121
> response.get(timeoutMillis, TimeUnit.MILLISECONDS);122 }
> else {123 producer.send(newRecord, new Callback() {124
> @Override125 public void
> onCompletion(final RecordMetadata metadata, final Exception e) {126
> if (e != null) {127
> LOGGER.error("Unable to write to Kafka in appender [" + getName() +
> "]", e);128 }129 }130
> });131 }132 }133 }
>
>
> Now, ProducerRecord has the additional parameters, in particular, I'm
> looking at:
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
> <https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V->
>
> public ProducerRecord(java.lang.String topic,
> java.lang.Integer partition,
> java.lang.Long timestamp,
> K
> <https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
> key,
> V
> <https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
> value)
>
> which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
> but would force us to also input the partition where the record should be
> sent. Still, the logic behind the partitioning within the KafkaProducer is
> so that if partition is null, then the defined partitioner will be used
> (DefaultPartitioner or the one defined by the 'partitioner.class'
> property), so, we could simply assign it as null.
>
> In terms of interface, we could add a single flag in the KafkaAppender
> definition, something like:
>
> <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
>
> If the 'timestamp' flag is false, then the record would be sent with the
> timestamp parameter of the ProducerRecord as null, leaving the behaviour as
> it is right now.
>
> What do you think about that? Was this something which was already
> discussed?
>
> Thank you for your attention,
> Federico