You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by pk...@apache.org on 2022/10/16 22:46:11 UTC

[logging-log4j2] 02/02: [LOG4J2-2678] Backport Kafka improvements from `master`

This is an automated email from the ASF dual-hosted git repository.

pkarwasz pushed a commit to branch release-2.x
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git

commit a5923011792bae9b4a0fd2c562a83165f319641b
Author: Piotr P. Karwasz <pi...@karwasz.org>
AuthorDate: Mon Oct 17 00:42:24 2022 +0200

    [LOG4J2-2678] Backport Kafka improvements from `master`
---
 .../core/appender/mom/kafka/KafkaAppenderTest.java |  1 +
 .../src/test/resources/KafkaAppenderTest.xml       |  6 +--
 .../core/appender/mom/kafka/KafkaAppender.java     | 44 ++++++++++++++----
 .../core/appender/mom/kafka/KafkaManager.java      | 54 ++++++++++++++++++----
 src/changes/changes.xml                            |  5 ++
 5 files changed, 88 insertions(+), 22 deletions(-)

diff --git a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
index 9e350d925b..b112f3028f 100644
--- a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
+++ b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
@@ -50,6 +50,7 @@ import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
diff --git a/log4j-core-test/src/test/resources/KafkaAppenderTest.xml b/log4j-core-test/src/test/resources/KafkaAppenderTest.xml
index a1a9c02868..9dec545010 100644
--- a/log4j-core-test/src/test/resources/KafkaAppenderTest.xml
+++ b/log4j-core-test/src/test/resources/KafkaAppenderTest.xml
@@ -32,12 +32,12 @@
       <Property name="bootstrap.servers">localhost:9092</Property>
       <Property name="syncSend">false</Property>
     </Kafka>
-    <Kafka name="KafkaAppenderWithKey" topic="kafka-topic" key="key">
+    <Kafka name="KafkaAppenderWithKey" topic="kafka-topic" key="key" sendEventTimestamp="true">
       <PatternLayout pattern="%m"/>
       <Property name="timeout.ms">1000</Property>
       <Property name="bootstrap.servers">localhost:9092</Property>
     </Kafka>
-    <Kafka name="KafkaAppenderWithKeyLookup" topic="kafka-topic" key="$${date:dd-MM-yyyy}">
+    <Kafka name="KafkaAppenderWithKeyLookup" topic="kafka-topic" key="$${date:dd-MM-yyyy}" sendEventTimestamp="true">
       <PatternLayout pattern="%m"/>
       <Property name="timeout.ms">1000</Property>
       <Property name="bootstrap.servers">localhost:9092</Property>
@@ -47,7 +47,7 @@
       <Property name="timeout.ms">1000</Property>
       <Property name="bootstrap.servers">fakeLocalhost:9092</Property>
     </Kafka>
-    <Kafka name="KafkaAppenderNoEventTimestamp" topic="kafka-topic" key="key" eventTimestamp="false">
+    <Kafka name="KafkaAppenderNoEventTimestamp" topic="kafka-topic" key="key" sendEventTimestamp="false">
       <PatternLayout pattern="%m"/>
       <Property name="timeout.ms">1000</Property>
       <Property name="bootstrap.servers">localhost:9092</Property>
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
index 562dfd59e8..0253f5e5a9 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -37,6 +37,7 @@ import org.apache.logging.log4j.core.config.plugins.Plugin;
 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
 import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
 import org.apache.logging.log4j.core.layout.SerializedLayout;
+import org.apache.logging.log4j.core.util.Integers;
 
 /**
  * Sends log events to an Apache Kafka topic.
@@ -46,14 +47,15 @@ public final class KafkaAppender extends AbstractAppender {
 
     /**
      * Builds KafkaAppender instances.
-     * 
-     * @param <B> The type to build
+     *
+     * @param <B>
+     *            The type to build
      */
     public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
             implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
 
         @PluginAttribute("retryCount")
-        private String retryCount;
+        private int retryCount;
 
         @PluginAttribute("topic")
         private String topic;
@@ -64,6 +66,9 @@ public final class KafkaAppender extends AbstractAppender {
         @PluginAttribute(value = "syncSend", defaultBoolean = true)
         private boolean syncSend;
 
+        @PluginAttribute(value = "sendEventTimestamp", defaultBoolean = false)
+        private boolean sendEventTimestamp;
+
         @SuppressWarnings("resource")
         @Override
         public KafkaAppender build() {
@@ -73,7 +78,7 @@ public final class KafkaAppender extends AbstractAppender {
                 return null;
             }
             final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(), getName(),
-                    topic, syncSend, getPropertyArray(), key);
+                    topic, syncSend, sendEventTimestamp, getPropertyArray(), key);
             return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager,
                     getPropertyArray(), getRetryCount());
         }
@@ -86,13 +91,16 @@ public final class KafkaAppender extends AbstractAppender {
 
             }
             return intRetryCount;
-
         }
 
         public String getTopic() {
             return topic;
         }
 
+        public boolean isSendEventTimestamp() {
+            return sendEventTimestamp;
+        }
+
         public boolean isSyncSend() {
             return syncSend;
         }
@@ -102,6 +110,22 @@ public final class KafkaAppender extends AbstractAppender {
             return asBuilder();
         }
 
+        @Deprecated
+        public B setRetryCount(final String retryCount) {
+          this.retryCount = Integers.parseInt(retryCount, 0);
+          return asBuilder();
+        }
+
+        public B setRetryCount(final int retryCount) {
+            this.retryCount = retryCount;
+            return asBuilder();
+        }
+
+        public B setSendEventTimestamp(boolean sendEventTimestamp) {
+            this.sendEventTimestamp = sendEventTimestamp;
+            return asBuilder();
+        }
+
         public B setSyncSend(final boolean syncSend) {
             this.syncSend = syncSend;
             return asBuilder();
@@ -128,7 +152,7 @@ public final class KafkaAppender extends AbstractAppender {
         }
         final KafkaManager kafkaManager = KafkaManager.getManager(configuration.getLoggerContext(), name, topic, true,
                 properties, key);
-        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null, null);
+        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null, 0);
     }
 
     /**
@@ -143,7 +167,7 @@ public final class KafkaAppender extends AbstractAppender {
 
     /**
      * Creates a builder for a KafkaAppender.
-     * 
+     *
      * @return a builder for a KafkaAppender.
      */
     @PluginBuilderFactory
@@ -151,13 +175,13 @@ public final class KafkaAppender extends AbstractAppender {
         return new Builder<B>().asBuilder();
     }
 
-    private final Integer retryCount;
+	private final Integer retryCount;
 
     private final KafkaManager manager;
 
     private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
             final boolean ignoreExceptions, final KafkaManager manager, final Property[] properties,
-            final Integer retryCount) {
+            final int retryCount) {
         super(name, filter, layout, ignoreExceptions, properties);
         this.manager = Objects.requireNonNull(manager, "manager");
         this.retryCount = retryCount;
@@ -221,6 +245,6 @@ public final class KafkaAppender extends AbstractAppender {
         } else {
             data = layout.toByteArray(event);
         }
-        manager.send(data);
+        manager.send(data, event.getTimeMillis());
     }
 }
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index 378099e734..ab814e460e 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -25,8 +25,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.appender.AbstractManager;
 import org.apache.logging.log4j.core.appender.ManagerFactory;
@@ -50,6 +52,8 @@ public class KafkaManager extends AbstractManager {
     private final String topic;
     private final String key;
     private final boolean syncSend;
+    private final boolean sendTimestamp;
+
     private static final KafkaManagerFactory factory = new KafkaManagerFactory();
 
     /*
@@ -58,13 +62,19 @@ public class KafkaManager extends AbstractManager {
      */
     public KafkaManager(final LoggerContext loggerContext, final String name, final String topic,
             final boolean syncSend, final Property[] properties, final String key) {
+        this(loggerContext, name, topic, syncSend, false, properties, key);
+    }
+
+    private KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend,
+            final boolean sendTimestamp, final Property[] properties, final String key) {
         super(loggerContext, name);
         this.topic = Objects.requireNonNull(topic, "topic");
         this.syncSend = syncSend;
+        this.sendTimestamp = sendTimestamp;
 
-        config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-        config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-        config.setProperty("batch.size", "0");
+        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
 
         for (final Property property : properties) {
             config.setProperty(property.getName(), property.getValue());
@@ -72,7 +82,11 @@ public class KafkaManager extends AbstractManager {
 
         this.key = key;
 
-        this.timeoutMillis = Integers.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
+        String timeoutMillis = config.getProperty("timeout.ms");
+        if (timeoutMillis == null) {
+            timeoutMillis = config.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_TIMEOUT_MILLIS);
+        }
+        this.timeoutMillis = Integers.parseInt(timeoutMillis);
     }
 
     @Override
@@ -105,7 +119,12 @@ public class KafkaManager extends AbstractManager {
         }
     }
 
+    @Deprecated
     public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+        send(msg, null);
+    }
+
+    public void send(final byte[] msg, final Long eventTimestamp) throws ExecutionException, InterruptedException, TimeoutException {
         if (producer != null) {
             byte[] newKey = null;
 
@@ -116,7 +135,9 @@ public class KafkaManager extends AbstractManager {
                 newKey = key.getBytes(StandardCharsets.UTF_8);
             }
 
-            final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
+            final Long timestamp = sendTimestamp ? eventTimestamp : null;
+
+            final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, null, timestamp, newKey, msg);
             if (syncSend) {
                 final Future<RecordMetadata> response = producer.send(newRecord);
                 response.get(timeoutMillis, TimeUnit.MILLISECONDS);
@@ -140,28 +161,42 @@ public class KafkaManager extends AbstractManager {
         return topic;
     }
 
+    @Deprecated
     public static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic,
             final boolean syncSend, final Property[] properties, final String key) {
+        return getManager(loggerContext, name, topic, syncSend, false, properties, key);
+    }
+
+    static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic,
+            final boolean syncSend, final boolean sendTimestamp, final Property[] properties, final String key) {
         StringBuilder sb = new StringBuilder(name);
-        sb.append(" ").append(topic).append(" ").append(syncSend + "");
+        sb.append(" ")
+            .append(topic)
+            .append(" ")
+            .append(syncSend)
+            .append(" ")
+            .append(sendTimestamp);
         for (Property prop : properties) {
             sb.append(" ").append(prop.getName()).append("=").append(prop.getValue());
         }
-        return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, properties, key));
+        return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, sendTimestamp,
+                properties, key));
     }
 
     private static class FactoryData {
         private final LoggerContext loggerContext;
         private final String topic;
         private final boolean syncSend;
+        private final boolean sendTimestamp;
         private final Property[] properties;
         private final String key;
 
         public FactoryData(final LoggerContext loggerContext, final String topic, final boolean syncSend,
-                final Property[] properties, final String key) {
+                final boolean sendTimestamp, final Property[] properties, final String key) {
             this.loggerContext = loggerContext;
             this.topic = topic;
             this.syncSend = syncSend;
+            this.sendTimestamp = sendTimestamp;
             this.properties = properties;
             this.key = key;
         }
@@ -171,7 +206,8 @@ public class KafkaManager extends AbstractManager {
     private static class KafkaManagerFactory implements ManagerFactory<KafkaManager, FactoryData> {
         @Override
         public KafkaManager createManager(String name, FactoryData data) {
-            return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.properties, data.key);
+            return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.sendTimestamp,
+                    data.properties, data.key);
         }
     }
 
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 49c2090af1..cd68354d80 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -29,6 +29,11 @@
          - "update" - Change
          - "remove" - Removed
     -->
+    <release version="2.19.1" date="TBD" description="GA Release 2.19.1">
+      <action issue="LOG4J2-2678" dev="pkarwasz" type="update" due-to="Federico D'Ambrosio">
+        Add LogEvent timestamp to ProducerRecord in KafkaAppender.
+      </action>
+    </release>
     <release version="2.19.0" date="2022-09-09" description="GA Release 2.19.0">
       <action issue="LOG4J2-3614" dev="vy" type="fix" due-to="strainu">
         Harden InstantFormatter against delegate failures.