You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by vy...@apache.org on 2020/11/12 22:10:24 UTC

[logging-log4j2] 01/01: LOG4J2-2916 Avoid redundant Kafka producer instantiation causing thread leaks.

This is an automated email from the ASF dual-hosted git repository.

vy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git

commit 97db5743a3b10e9017bf70794d6275b21553dd44
Author: Volkan Yazıcı <vo...@gmail.com>
AuthorDate: Thu Nov 12 17:46:20 2020 +0100

    LOG4J2-2916 Avoid redundant Kafka producer instantiation causing thread leaks.
---
 .../logging/log4j/kafka/appender/KafkaManager.java |  4 +-
 .../KafkaManagerProducerThreadLeakTest.java        | 65 ++++++++++++++++++++++
 src/changes/changes.xml                            |  3 +
 3 files changed, 71 insertions(+), 1 deletion(-)

diff --git a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
index a4e4ddb..db03167 100644
--- a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
+++ b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
@@ -153,7 +153,9 @@ public class KafkaManager extends AbstractManager {
     }
 
     public void startup() {
-        producer = producerFactory.newKafkaProducer(config);
+        if (producer == null) {
+            producer = producerFactory.newKafkaProducer(config);
+        }
     }
 
     public String getTopic() {
diff --git a/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaManagerProducerThreadLeakTest.java b/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaManagerProducerThreadLeakTest.java
new file mode 100644
index 0000000..73fcf62
--- /dev/null
+++ b/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaManagerProducerThreadLeakTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.kafka.appender;
+
+import org.apache.logging.log4j.categories.Appenders;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.junit.LoggerContextSource;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Verifies that restarting the {@link LoggerContext} doesn't cause
+ * {@link KafkaManager} to leak threads.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-2916">LOG4J2-2916</a>
+ */
+@Category(Appenders.Kafka.class)
+@LoggerContextSource("KafkaAppenderTest.xml")
+class KafkaManagerProducerThreadLeakTest {
+
+    @Test
+    void context_restart_shouldnt_leak_producer_threads(final LoggerContext context) {
+
+        // Determine the initial number of threads.
+        final int initialThreadCount = kafkaProducerThreadCount();
+
+        // Perform context restarts.
+        final int contextRestartCount = 3;
+        for (int i = 0; i < contextRestartCount; i++) {
+            context.reconfigure();
+        }
+
+        // Verify the final thread count.
+        final int lastThreadCount = kafkaProducerThreadCount();
+        assertEquals(initialThreadCount, lastThreadCount);
+
+    }
+
+    private static int kafkaProducerThreadCount() {
+        final long threadCount = Thread
+                .getAllStackTraces()
+                .keySet()
+                .stream()
+                .filter(thread -> thread.getName().startsWith("kafka-producer"))
+                .count();
+        return Math.toIntExact(threadCount);
+    }
+
+}
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 313a922..e191147 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -31,6 +31,9 @@
          - "remove" - Removed
     -->
     <release version="3.0.0" date="2019-xx-xx" description="GA Release 3.0.0">
+      <action issue="LOG4J2-2916" dev="vy" type="fix" due-to="wuqian0808">
+        Avoid redundant Kafka producer instantiation causing thread leaks.
+      </action>
       <action dev="vy" type="update">
         Update jetty-util from 8.2.0.v20160908 to 9.4.31.v20200723.
       </action>