You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by pk...@apache.org on 2022/10/16 22:46:11 UTC
[logging-log4j2] 02/02: [LOG4J2-2678] Backport Kafka improvements from `master`
This is an automated email from the ASF dual-hosted git repository.
pkarwasz pushed a commit to branch release-2.x
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git
commit a5923011792bae9b4a0fd2c562a83165f319641b
Author: Piotr P. Karwasz <pi...@karwasz.org>
AuthorDate: Mon Oct 17 00:42:24 2022 +0200
[LOG4J2-2678] Backport Kafka improvements from `master`
---
.../core/appender/mom/kafka/KafkaAppenderTest.java | 1 +
.../src/test/resources/KafkaAppenderTest.xml | 6 +--
.../core/appender/mom/kafka/KafkaAppender.java | 44 ++++++++++++++----
.../core/appender/mom/kafka/KafkaManager.java | 54 ++++++++++++++++++----
src/changes/changes.xml | 5 ++
5 files changed, 88 insertions(+), 22 deletions(-)
diff --git a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
index 9e350d925b..b112f3028f 100644
--- a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
+++ b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
@@ -50,6 +50,7 @@ import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
diff --git a/log4j-core-test/src/test/resources/KafkaAppenderTest.xml b/log4j-core-test/src/test/resources/KafkaAppenderTest.xml
index a1a9c02868..9dec545010 100644
--- a/log4j-core-test/src/test/resources/KafkaAppenderTest.xml
+++ b/log4j-core-test/src/test/resources/KafkaAppenderTest.xml
@@ -32,12 +32,12 @@
<Property name="bootstrap.servers">localhost:9092</Property>
<Property name="syncSend">false</Property>
</Kafka>
- <Kafka name="KafkaAppenderWithKey" topic="kafka-topic" key="key">
+ <Kafka name="KafkaAppenderWithKey" topic="kafka-topic" key="key" sendEventTimestamp="true">
<PatternLayout pattern="%m"/>
<Property name="timeout.ms">1000</Property>
<Property name="bootstrap.servers">localhost:9092</Property>
</Kafka>
- <Kafka name="KafkaAppenderWithKeyLookup" topic="kafka-topic" key="$${date:dd-MM-yyyy}">
+ <Kafka name="KafkaAppenderWithKeyLookup" topic="kafka-topic" key="$${date:dd-MM-yyyy}" sendEventTimestamp="true">
<PatternLayout pattern="%m"/>
<Property name="timeout.ms">1000</Property>
<Property name="bootstrap.servers">localhost:9092</Property>
@@ -47,7 +47,7 @@
<Property name="timeout.ms">1000</Property>
<Property name="bootstrap.servers">fakeLocalhost:9092</Property>
</Kafka>
- <Kafka name="KafkaAppenderNoEventTimestamp" topic="kafka-topic" key="key" eventTimestamp="false">
+ <Kafka name="KafkaAppenderNoEventTimestamp" topic="kafka-topic" key="key" sendEventTimestamp="false">
<PatternLayout pattern="%m"/>
<Property name="timeout.ms">1000</Property>
<Property name="bootstrap.servers">localhost:9092</Property>
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
index 562dfd59e8..0253f5e5a9 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -37,6 +37,7 @@ import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
import org.apache.logging.log4j.core.layout.SerializedLayout;
+import org.apache.logging.log4j.core.util.Integers;
/**
* Sends log events to an Apache Kafka topic.
@@ -46,14 +47,15 @@ public final class KafkaAppender extends AbstractAppender {
/**
* Builds KafkaAppender instances.
- *
- * @param <B> The type to build
+ *
+ * @param <B>
+ * The type to build
*/
public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
@PluginAttribute("retryCount")
- private String retryCount;
+ private int retryCount;
@PluginAttribute("topic")
private String topic;
@@ -64,6 +66,9 @@ public final class KafkaAppender extends AbstractAppender {
@PluginAttribute(value = "syncSend", defaultBoolean = true)
private boolean syncSend;
+ @PluginAttribute(value = "sendEventTimestamp", defaultBoolean = false)
+ private boolean sendEventTimestamp;
+
@SuppressWarnings("resource")
@Override
public KafkaAppender build() {
@@ -73,7 +78,7 @@ public final class KafkaAppender extends AbstractAppender {
return null;
}
final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(), getName(),
- topic, syncSend, getPropertyArray(), key);
+ topic, syncSend, sendEventTimestamp, getPropertyArray(), key);
return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager,
getPropertyArray(), getRetryCount());
}
@@ -86,13 +91,16 @@ public final class KafkaAppender extends AbstractAppender {
}
return intRetryCount;
-
}
public String getTopic() {
return topic;
}
+ public boolean isSendEventTimestamp() {
+ return sendEventTimestamp;
+ }
+
public boolean isSyncSend() {
return syncSend;
}
@@ -102,6 +110,22 @@ public final class KafkaAppender extends AbstractAppender {
return asBuilder();
}
+ @Deprecated
+ public B setRetryCount(final String retryCount) {
+ this.retryCount = Integers.parseInt(retryCount, 0);
+ return asBuilder();
+ }
+
+ public B setRetryCount(final int retryCount) {
+ this.retryCount = retryCount;
+ return asBuilder();
+ }
+
+ public B setSendEventTimestamp(boolean sendEventTimestamp) {
+ this.sendEventTimestamp = sendEventTimestamp;
+ return asBuilder();
+ }
+
public B setSyncSend(final boolean syncSend) {
this.syncSend = syncSend;
return asBuilder();
@@ -128,7 +152,7 @@ public final class KafkaAppender extends AbstractAppender {
}
final KafkaManager kafkaManager = KafkaManager.getManager(configuration.getLoggerContext(), name, topic, true,
properties, key);
- return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null, null);
+ return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null, 0);
}
/**
@@ -143,7 +167,7 @@ public final class KafkaAppender extends AbstractAppender {
/**
* Creates a builder for a KafkaAppender.
- *
+ *
* @return a builder for a KafkaAppender.
*/
@PluginBuilderFactory
@@ -151,13 +175,13 @@ public final class KafkaAppender extends AbstractAppender {
return new Builder<B>().asBuilder();
}
- private final Integer retryCount;
+ private final Integer retryCount;
private final KafkaManager manager;
private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
final boolean ignoreExceptions, final KafkaManager manager, final Property[] properties,
- final Integer retryCount) {
+ final int retryCount) {
super(name, filter, layout, ignoreExceptions, properties);
this.manager = Objects.requireNonNull(manager, "manager");
this.retryCount = retryCount;
@@ -221,6 +245,6 @@ public final class KafkaAppender extends AbstractAppender {
} else {
data = layout.toByteArray(event);
}
- manager.send(data);
+ manager.send(data, event.getTimeMillis());
}
}
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index 378099e734..ab814e460e 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -25,8 +25,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.AbstractManager;
import org.apache.logging.log4j.core.appender.ManagerFactory;
@@ -50,6 +52,8 @@ public class KafkaManager extends AbstractManager {
private final String topic;
private final String key;
private final boolean syncSend;
+ private final boolean sendTimestamp;
+
private static final KafkaManagerFactory factory = new KafkaManagerFactory();
/*
@@ -58,13 +62,19 @@ public class KafkaManager extends AbstractManager {
*/
public KafkaManager(final LoggerContext loggerContext, final String name, final String topic,
final boolean syncSend, final Property[] properties, final String key) {
+ this(loggerContext, name, topic, syncSend, false, properties, key);
+ }
+
+ private KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend,
+ final boolean sendTimestamp, final Property[] properties, final String key) {
super(loggerContext, name);
this.topic = Objects.requireNonNull(topic, "topic");
this.syncSend = syncSend;
+ this.sendTimestamp = sendTimestamp;
- config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- config.setProperty("batch.size", "0");
+ config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ config.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
for (final Property property : properties) {
config.setProperty(property.getName(), property.getValue());
@@ -72,7 +82,11 @@ public class KafkaManager extends AbstractManager {
this.key = key;
- this.timeoutMillis = Integers.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
+ String timeoutMillis = config.getProperty("timeout.ms");
+ if (timeoutMillis == null) {
+ timeoutMillis = config.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_TIMEOUT_MILLIS);
+ }
+ this.timeoutMillis = Integers.parseInt(timeoutMillis);
}
@Override
@@ -105,7 +119,12 @@ public class KafkaManager extends AbstractManager {
}
}
+ @Deprecated
public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+ send(msg, null);
+ }
+
+ public void send(final byte[] msg, final Long eventTimestamp) throws ExecutionException, InterruptedException, TimeoutException {
if (producer != null) {
byte[] newKey = null;
@@ -116,7 +135,9 @@ public class KafkaManager extends AbstractManager {
newKey = key.getBytes(StandardCharsets.UTF_8);
}
- final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
+ final Long timestamp = sendTimestamp ? eventTimestamp : null;
+
+ final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, null, timestamp, newKey, msg);
if (syncSend) {
final Future<RecordMetadata> response = producer.send(newRecord);
response.get(timeoutMillis, TimeUnit.MILLISECONDS);
@@ -140,28 +161,42 @@ public class KafkaManager extends AbstractManager {
return topic;
}
+ @Deprecated
public static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic,
final boolean syncSend, final Property[] properties, final String key) {
+ return getManager(loggerContext, name, topic, syncSend, false, properties, key);
+ }
+
+ static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic,
+ final boolean syncSend, final boolean sendTimestamp, final Property[] properties, final String key) {
StringBuilder sb = new StringBuilder(name);
- sb.append(" ").append(topic).append(" ").append(syncSend + "");
+ sb.append(" ")
+ .append(topic)
+ .append(" ")
+ .append(syncSend)
+ .append(" ")
+ .append(sendTimestamp);
for (Property prop : properties) {
sb.append(" ").append(prop.getName()).append("=").append(prop.getValue());
}
- return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, properties, key));
+ return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, sendTimestamp,
+ properties, key));
}
private static class FactoryData {
private final LoggerContext loggerContext;
private final String topic;
private final boolean syncSend;
+ private final boolean sendTimestamp;
private final Property[] properties;
private final String key;
public FactoryData(final LoggerContext loggerContext, final String topic, final boolean syncSend,
- final Property[] properties, final String key) {
+ final boolean sendTimestamp, final Property[] properties, final String key) {
this.loggerContext = loggerContext;
this.topic = topic;
this.syncSend = syncSend;
+ this.sendTimestamp = sendTimestamp;
this.properties = properties;
this.key = key;
}
@@ -171,7 +206,8 @@ public class KafkaManager extends AbstractManager {
private static class KafkaManagerFactory implements ManagerFactory<KafkaManager, FactoryData> {
@Override
public KafkaManager createManager(String name, FactoryData data) {
- return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.properties, data.key);
+ return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.sendTimestamp,
+ data.properties, data.key);
}
}
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 49c2090af1..cd68354d80 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -29,6 +29,11 @@
- "update" - Change
- "remove" - Removed
-->
+ <release version="2.19.1" date="TBD" description="GA Release 2.19.1">
+ <action issue="LOG4J2-2678" dev="pkarwasz" type="update" due-to="Federico D'Ambrosio">
+ Add LogEvent timestamp to ProducerRecord in KafkaAppender.
+ </action>
+ </release>
<release version="2.19.0" date="2022-09-09" description="GA Release 2.19.0">
<action issue="LOG4J2-3614" dev="vy" type="fix" due-to="strainu">
Harden InstantFormatter against delegate failures.