You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by ck...@apache.org on 2019/05/22 21:02:30 UTC
[logging-log4j2] 01/06: LOG4J2-2606: Substantially improve async
logging performance under heavy load
This is an automated email from the ASF dual-hosted git repository.
ckozak pushed a commit to branch LOG4J2-2606
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git
commit 42dc53d90f2dc2c8de6061056be17d7991efd94e
Author: Carter Kozak <ck...@apache.org>
AuthorDate: Tue May 21 18:39:13 2019 -0400
LOG4J2-2606: Substantially improve async logging performance under heavy load
This change applies synchronization to AsyncLoggerDisruptor blocking enqueue
operations when the asynchronous logging queue is completely full.
This new behavior may be disabled using the boolean property
`AsyncLogger.SynchronizeEnqueueWhenQueueFull` (default true).
Alternatives tested:
* All available lmax disruptor `WaitStrategy` implementations.
None of the available implementations provided a substantial
difference in throughput or CPU utilization. In all cases
my processor was stuck at or near 100% across all six cores.
* Based on feedback from https://github.com/LMAX-Exchange/disruptor/issues/266
I attempted avoiding the blocking enqueue entirely in favor of
`LockSupport.parkNanos` with values ranging from 1,000 through
1,000,000. 1,000,000 provided the most throughput among parkNanos
values tested, but is too long for most scenarios.
* Semaphore instead of synchronized. Testing with permits
equivalent to the available processor count yielded higher
CPU utilization and lower throughput than a semaphore
with a single permit. Given that most work occurs on
the (single) background thread, there's not much reason
to allow multiple enqueues simultaneously.
Updated benchmarks adding "ENQUEUE_UNSYNCHRONIZED" following
the old unsynchronized path, where "ENQUEUE" shows an improvement:
Benchmark (queueFullPolicy) Mode Cnt Score Error Units
ConcurrentAsyncLoggerToFileBenchmark.concurrentLoggingThreads ENQUEUE thrpt 3 1146649.538 ± 103899.750 ops/s
ConcurrentAsyncLoggerToFileBenchmark.concurrentLoggingThreads ENQUEUE_UNSYNCHRONIZED thrpt 3 422244.124 ± 785429.301 ops/s
ConcurrentAsyncLoggerToFileBenchmark.concurrentLoggingThreads SYNCHRONOUS thrpt 3 1084417.832 ± 354547.695 ops/s
ConcurrentAsyncLoggerToFileBenchmark.singleLoggingThread ENQUEUE thrpt 3 1250748.284 ± 1202507.746 ops/s
ConcurrentAsyncLoggerToFileBenchmark.singleLoggingThread ENQUEUE_UNSYNCHRONIZED thrpt 3 1280794.688 ± 832379.969 ops/s
ConcurrentAsyncLoggerToFileBenchmark.singleLoggingThread SYNCHRONOUS thrpt 3 1227202.214 ± 1398451.960 ops/s
---
.../logging/log4j/core/async/AsyncLogger.java | 4 +-
.../log4j/core/async/AsyncLoggerDisruptor.java | 80 +++++++++++++++++++---
.../logging/log4j/core/async/DisruptorUtil.java | 5 ++
.../jmh/ConcurrentAsyncLoggerToFileBenchmark.java | 25 +++++--
src/changes/changes.xml | 3 +
5 files changed, 100 insertions(+), 17 deletions(-)
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLogger.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLogger.java
index c278316..67b36da 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLogger.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLogger.java
@@ -178,7 +178,7 @@ public class AsyncLogger extends Logger implements EventTranslatorVararg<RingBuf
final EventRoute eventRoute = loggerDisruptor.getEventRoute(translator.level);
switch (eventRoute) {
case ENQUEUE:
- loggerDisruptor.enqueueLogMessageInfo(translator);
+ loggerDisruptor.enqueueLogMessageQueueFullBlocking(translator);
break;
case SYNCHRONOUS:
logMessageInCurrentThread(translator.fqcn, translator.level, translator.marker, translator.message,
@@ -328,7 +328,7 @@ public class AsyncLogger extends Logger implements EventTranslatorVararg<RingBuf
final EventRoute eventRoute = loggerDisruptor.getEventRoute(level);
switch (eventRoute) {
case ENQUEUE:
- loggerDisruptor.getDisruptor().getRingBuffer().publishEvent(this,
+ loggerDisruptor.enqueueLogMessageQueueFullBlocking(this,
this, // asyncLogger: 0
location, // location: 1
fqcn, // 2
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerDisruptor.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerDisruptor.java
index 7a7e546..b827f8c 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerDisruptor.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerDisruptor.java
@@ -20,7 +20,9 @@ package org.apache.logging.log4j.core.async;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import com.lmax.disruptor.EventTranslatorVararg;
import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.core.AbstractLifeCycle;
import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
import org.apache.logging.log4j.core.util.Log4jThreadFactory;
@@ -32,6 +34,7 @@ import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.logging.log4j.message.Message;
/**
* Helper class for async loggers: AsyncLoggerDisruptor handles the mechanics of working with the LMAX Disruptor, and
@@ -43,6 +46,8 @@ class AsyncLoggerDisruptor extends AbstractLifeCycle {
private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
+ private final Object queueFullEnqueueLock = new Object();
+
private volatile Disruptor<RingBufferLogEvent> disruptor;
private String contextName;
@@ -202,32 +207,91 @@ class AsyncLoggerDisruptor extends AbstractLifeCycle {
return false;
}
- public boolean tryPublish(final RingBufferLogEventTranslator translator) {
+ boolean tryPublish(final RingBufferLogEventTranslator translator) {
try {
+ // Note: we deliberately access the volatile disruptor field afresh here.
+ // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
+ // was shut down, which could cause the publishEvent method to hang and never return.
return disruptor.getRingBuffer().tryPublishEvent(translator);
} catch (final NullPointerException npe) {
// LOG4J2-639: catch NPE if disruptor field was set to null in stop()
- LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}", contextName,
- translator.level, translator.loggerName, translator.message.getFormattedMessage()
- + (translator.thrown == null ? "" : Throwables.toStringList(translator.thrown)));
+ logWarningOnNpeFromDisruptorPublish(translator);
return false;
}
}
- void enqueueLogMessageInfo(final RingBufferLogEventTranslator translator) {
+ void enqueueLogMessageQueueFullBlocking(final RingBufferLogEventTranslator translator) {
+ try {
+ // Note: we deliberately access the volatile disruptor field afresh here.
+ // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
+ // was shut down, which could cause the publishEvent method to hang and never return.
+ if (synchronizeEnqueueWhenQueueFull()) {
+ synchronized (queueFullEnqueueLock) {
+ disruptor.publishEvent(translator);
+ }
+ } else {
+ disruptor.publishEvent(translator);
+ }
+ } catch (final NullPointerException npe) {
+ // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
+ logWarningOnNpeFromDisruptorPublish(translator);
+ }
+ }
+
+ void enqueueLogMessageQueueFullBlocking(
+ final EventTranslatorVararg<RingBufferLogEvent> translator,
+ final AsyncLogger asyncLogger,
+ final StackTraceElement location,
+ final String fqcn,
+ final Level level,
+ final Marker marker,
+ final Message msg,
+ final Throwable thrown) {
try {
// Note: we deliberately access the volatile disruptor field afresh here.
// Avoiding this and using an older reference could result in adding a log event to the disruptor after it
// was shut down, which could cause the publishEvent method to hang and never return.
- disruptor.publishEvent(translator);
+ if (synchronizeEnqueueWhenQueueFull()) {
+ synchronized (queueFullEnqueueLock) {
+ disruptor.getRingBuffer().publishEvent(translator,
+ asyncLogger, // asyncLogger: 0
+ location, // location: 1
+ fqcn, // 2
+ level, // 3
+ marker, // 4
+ msg, // 5
+ thrown); // 6
+ }
+ } else {
+ disruptor.getRingBuffer().publishEvent(translator,
+ asyncLogger, // asyncLogger: 0
+ location, // location: 1
+ fqcn, // 2
+ level, // 3
+ marker, // 4
+ msg, // 5
+ thrown); // 6
+ }
} catch (final NullPointerException npe) {
// LOG4J2-639: catch NPE if disruptor field was set to null in stop()
LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}", contextName,
- translator.level, translator.loggerName, translator.message.getFormattedMessage()
- + (translator.thrown == null ? "" : Throwables.toStringList(translator.thrown)));
+ level, fqcn, msg.getFormattedMessage()
+ + (thrown == null ? "" : Throwables.toStringList(thrown)));
}
}
+ private boolean synchronizeEnqueueWhenQueueFull() {
+ return DisruptorUtil.SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_IS_FULL
+ // Background thread must never block
+ && backgroundThreadId != Thread.currentThread().getId();
+ }
+
+ private void logWarningOnNpeFromDisruptorPublish(final RingBufferLogEventTranslator translator) {
+ LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}", contextName,
+ translator.level, translator.loggerName, translator.message.getFormattedMessage()
+ + (translator.thrown == null ? "" : Throwables.toStringList(translator.thrown)));
+ }
+
/**
* Returns whether it is allowed to store non-JDK classes in ThreadLocal objects for efficiency.
*
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorUtil.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorUtil.java
index 9e946f9..59d2a7e 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorUtil.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorUtil.java
@@ -46,6 +46,11 @@ final class DisruptorUtil {
private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
private static final int RINGBUFFER_NO_GC_DEFAULT_SIZE = 4 * 1024;
+ // LOG4J2-2606: Disruptor spins enqueuing events across multiple threads when the queue is full.
+ // CPU utilization is significantly reduced by restricting access to the enqueue operation.
+ static final boolean SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_IS_FULL = PropertiesUtil.getProperties()
+ .getBooleanProperty("AsyncLogger.SynchronizeEnqueueWhenQueueFull", true);
+
private DisruptorUtil() {
}
diff --git a/log4j-perf/src/main/java/org/apache/logging/log4j/perf/jmh/ConcurrentAsyncLoggerToFileBenchmark.java b/log4j-perf/src/main/java/org/apache/logging/log4j/perf/jmh/ConcurrentAsyncLoggerToFileBenchmark.java
index f765437..f6faad0 100644
--- a/log4j-perf/src/main/java/org/apache/logging/log4j/perf/jmh/ConcurrentAsyncLoggerToFileBenchmark.java
+++ b/log4j-perf/src/main/java/org/apache/logging/log4j/perf/jmh/ConcurrentAsyncLoggerToFileBenchmark.java
@@ -39,6 +39,9 @@ import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -71,7 +74,7 @@ public class ConcurrentAsyncLoggerToFileBenchmark {
@State(Scope.Benchmark)
public static class BenchmarkState {
- @Param({"ENQUEUE", "SYNCHRONOUS"})
+ @Param({"ENQUEUE", "ENQUEUE_UNSYNCHRONIZED", "SYNCHRONOUS"})
private QueueFullPolicy queueFullPolicy;
private Logger logger;
@@ -95,17 +98,25 @@ public class ConcurrentAsyncLoggerToFileBenchmark {
@SuppressWarnings("unused") // Used by JMH
public enum QueueFullPolicy {
- ENQUEUE("Default"),
- SYNCHRONOUS(SynchronousAsyncQueueFullPolicy.class.getName());
+ ENQUEUE(Collections.singletonMap("log4j2.AsyncQueueFullPolicy", "Default")),
+ ENQUEUE_UNSYNCHRONIZED(new HashMap<>() {{
+ put("log4j2.AsyncQueueFullPolicy", "Default");
+ put("log4j2.AsyncLogger.SynchronizeEnqueueWhenQueueFull", "false");
+ }
+ }),
+ SYNCHRONOUS(Collections.singletonMap("log4j2.AsyncQueueFullPolicy",
+ SynchronousAsyncQueueFullPolicy.class.getName()));
- private final String queueFullPolicy;
+ private final Map<String, String> properties;
- QueueFullPolicy(String queueFullPolicy) {
- this.queueFullPolicy = queueFullPolicy;
+ QueueFullPolicy(Map<String, String> properties) {
+ this.properties = properties;
}
void setProperties() {
- System.setProperty("log4j2.AsyncQueueFullPolicy", queueFullPolicy);
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ System.setProperty(entry.getKey(), entry.getValue());
+ }
}
}
}
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 53c28d4..ca50ffb 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -436,6 +436,9 @@
<action issue="LOG4J2-2611" dev="ckozak" type="add">
AsyncQueueFullPolicy configuration short values "Default" and "Discard" are case insensitive to avoid confusion.
</action>
+ <action issue="LOG4J2-2606" dev="ckozak" type="fix">
+ Asynchronous logging when the queue is full no longer results in heavy CPU utilization and low throughput.
+ </action>
</release>
<release version="2.11.2" date="2018-MM-DD" description="GA Release 2.11.2">
<action issue="LOG4J2-2500" dev="rgoers" type="fix">