You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by vy...@apache.org on 2020/11/12 22:11:17 UTC

[logging-log4j2] branch release-2.x updated: LOG4J2-2916 Avoid redundant Kafka producer instantiation causing thread leaks.

This is an automated email from the ASF dual-hosted git repository.

vy pushed a commit to branch release-2.x
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git


The following commit(s) were added to refs/heads/release-2.x by this push:
     new 4bd4b5e  LOG4J2-2916 Avoid redundant Kafka producer instantiation causing thread leaks.
4bd4b5e is described below

commit 4bd4b5efe2cd1e1da7ef4c84cd205aba0d1187b4
Author: Volkan Yazici <vo...@gmail.com>
AuthorDate: Thu Nov 12 23:07:12 2020 +0100

    LOG4J2-2916 Avoid redundant Kafka producer instantiation causing thread leaks.
---
 .../core/appender/mom/kafka/KafkaManager.java      |  4 +-
 .../kafka/KafkaManagerProducerThreadLeakTest.java  | 66 ++++++++++++++++++++++
 .../KafkaManagerProducerThreadLeakTest.xml         | 31 ++++++++++
 src/changes/changes.xml                            |  3 +
 4 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index 394cef0..a03cbb1 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -138,7 +138,9 @@ public class KafkaManager extends AbstractManager {
 	}
 
 	public void startup() {
-		producer = producerFactory.newKafkaProducer(config);
+		if (producer == null) {
+			producer = producerFactory.newKafkaProducer(config);
+		}
 	}
 
 	public String getTopic() {
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManagerProducerThreadLeakTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManagerProducerThreadLeakTest.java
new file mode 100644
index 0000000..3158e91
--- /dev/null
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManagerProducerThreadLeakTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import org.apache.logging.log4j.categories.Appenders;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.junit.LoggerContextSource;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Verifies that restarting the {@link LoggerContext} doesn't cause
+ * {@link KafkaManager} to leak threads.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-2916">LOG4J2-2916</a>
+ */
+@Category(Appenders.Kafka.class)
+@LoggerContextSource("KafkaManagerProducerThreadLeakTest.xml")
+class KafkaManagerProducerThreadLeakTest {
+
+    @Test
+    void context_restart_shouldnt_leak_producer_threads(final LoggerContext context) {
+
+        // Determine the initial number of threads.
+        final int initialThreadCount = kafkaProducerThreadCount();
+
+        // Perform context restarts.
+        final int contextRestartCount = 3;
+        for (int i = 0; i < contextRestartCount; i++) {
+            context.reconfigure();
+        }
+
+        // Verify the final thread count.
+        final int lastThreadCount = kafkaProducerThreadCount();
+        assertEquals(initialThreadCount, lastThreadCount);
+
+    }
+
+    private static int kafkaProducerThreadCount() {
+        final long threadCount = Thread
+                .getAllStackTraces()
+                .keySet()
+                .stream()
+                .filter(thread -> thread.getName().startsWith("kafka-producer"))
+                .count();
+        return Math.toIntExact(threadCount);
+    }
+
+}
diff --git a/log4j-core/src/test/resources/KafkaManagerProducerThreadLeakTest.xml b/log4j-core/src/test/resources/KafkaManagerProducerThreadLeakTest.xml
new file mode 100644
index 0000000..09ca852
--- /dev/null
+++ b/log4j-core/src/test/resources/KafkaManagerProducerThreadLeakTest.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache license, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the license for the specific language governing permissions and
+  ~ limitations under the license.
+  -->
+<Configuration name="KafkaManagerProducerThreadLeakTest" status="OFF">
+  <Appenders>
+   <Kafka name="KafkaAppender" topic="kafka-topic">
+      <PatternLayout pattern="[%m]"/>
+      <Property name="bootstrap.servers">localhost:9092</Property>
+      <Property name="timeout.ms">1000</Property>
+    </Kafka>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="KafkaAppender"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index e766d6e..700e1f5 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -30,6 +30,9 @@
          - "remove" - Removed
     -->
     <release version="2.14.1" date="2020-MM-DD" description="GA Release 2.14.1">
+      <action issue="LOG4J2-2916" dev="vy" type="fix" due-to="wuqian0808">
+        Avoid redundant Kafka producer instantiation causing thread leaks.
+      </action>
       <action dev="ggregory" type="update">
         Update org.fusesource.jansi:jansi 1.17.1 -> 2.0.1.
       </action>