You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by ck...@apache.org on 2019/05/22 21:02:30 UTC

[logging-log4j2] 01/06: LOG4J2-2606: Substantially improve async logging performance under heavy load

This is an automated email from the ASF dual-hosted git repository.

ckozak pushed a commit to branch LOG4J2-2606
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git

commit 42dc53d90f2dc2c8de6061056be17d7991efd94e
Author: Carter Kozak <ck...@apache.org>
AuthorDate: Tue May 21 18:39:13 2019 -0400

    LOG4J2-2606: Substantially improve async logging performance under heavy load
    
    This change applies synchronization to AsyncLoggerDisruptor blocking enqueue
    operations when the asynchronous logging queue is completely full.
    This new behavior may be disabled using the boolean property
    `AsyncLogger.SynchronizeEnqueueWhenQueueFull` (default true).
    
    Alternatives tested:
    * All available lmax disruptor `WaitStrategy` implementations.
      None of the available implementations provided a substantial
      difference in throughput or CPU utilization. In all cases
      my processor was stuck at or near 100% across all six cores.
    * Based on feedback from https://github.com/LMAX-Exchange/disruptor/issues/266
      I attempted avoiding the blocking enqueue entirely in favor of
      `LockSupport.parkNanos` with values ranging from 1,000 through
      1,000,000. 1,000,000 provided the most throughput among parkNanos
      values tested, but is too long for most scenarios.
    * Semaphore instead of synchronized. Testing with permits
      equivalent to the available processor count yielded higher
      CPU utilization and lower throughput than a semaphore
      with a single permit. Given that most work occurs on
      the (single) background thread, there's not much reason
      to allow multiple enqueues simultaneously.
    
    Updated benchmarks adding "ENQUEUE_UNSYNCHRONIZED" following
    the old unsynchronized path, where "ENQUEUE" shows an improvement:
    
    Benchmark                                                           (queueFullPolicy)   Mode  Cnt        Score         Error  Units
    ConcurrentAsyncLoggerToFileBenchmark.concurrentLoggingThreads                 ENQUEUE  thrpt    3  1146649.538 ±  103899.750  ops/s
    ConcurrentAsyncLoggerToFileBenchmark.concurrentLoggingThreads  ENQUEUE_UNSYNCHRONIZED  thrpt    3   422244.124 ±  785429.301  ops/s
    ConcurrentAsyncLoggerToFileBenchmark.concurrentLoggingThreads             SYNCHRONOUS  thrpt    3  1084417.832 ±  354547.695  ops/s
    ConcurrentAsyncLoggerToFileBenchmark.singleLoggingThread                      ENQUEUE  thrpt    3  1250748.284 ± 1202507.746  ops/s
    ConcurrentAsyncLoggerToFileBenchmark.singleLoggingThread       ENQUEUE_UNSYNCHRONIZED  thrpt    3  1280794.688 ±  832379.969  ops/s
    ConcurrentAsyncLoggerToFileBenchmark.singleLoggingThread                  SYNCHRONOUS  thrpt    3  1227202.214 ± 1398451.960  ops/s
---
 .../logging/log4j/core/async/AsyncLogger.java      |  4 +-
 .../log4j/core/async/AsyncLoggerDisruptor.java     | 80 +++++++++++++++++++---
 .../logging/log4j/core/async/DisruptorUtil.java    |  5 ++
 .../jmh/ConcurrentAsyncLoggerToFileBenchmark.java  | 25 +++++--
 src/changes/changes.xml                            |  3 +
 5 files changed, 100 insertions(+), 17 deletions(-)

diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLogger.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLogger.java
index c278316..67b36da 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLogger.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLogger.java
@@ -178,7 +178,7 @@ public class AsyncLogger extends Logger implements EventTranslatorVararg<RingBuf
         final EventRoute eventRoute = loggerDisruptor.getEventRoute(translator.level);
         switch (eventRoute) {
             case ENQUEUE:
-                loggerDisruptor.enqueueLogMessageInfo(translator);
+                loggerDisruptor.enqueueLogMessageQueueFullBlocking(translator);
                 break;
             case SYNCHRONOUS:
                 logMessageInCurrentThread(translator.fqcn, translator.level, translator.marker, translator.message,
@@ -328,7 +328,7 @@ public class AsyncLogger extends Logger implements EventTranslatorVararg<RingBuf
         final EventRoute eventRoute = loggerDisruptor.getEventRoute(level);
         switch (eventRoute) {
             case ENQUEUE:
-                loggerDisruptor.getDisruptor().getRingBuffer().publishEvent(this,
+                loggerDisruptor.enqueueLogMessageQueueFullBlocking(this,
                         this, // asyncLogger: 0
                         location, // location: 1
                         fqcn, // 2
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerDisruptor.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerDisruptor.java
index 7a7e546..b827f8c 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerDisruptor.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerDisruptor.java
@@ -20,7 +20,9 @@ package org.apache.logging.log4j.core.async;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import com.lmax.disruptor.EventTranslatorVararg;
 import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Marker;
 import org.apache.logging.log4j.core.AbstractLifeCycle;
 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
 import org.apache.logging.log4j.core.util.Log4jThreadFactory;
@@ -32,6 +34,7 @@ import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.logging.log4j.message.Message;
 
 /**
  * Helper class for async loggers: AsyncLoggerDisruptor handles the mechanics of working with the LMAX Disruptor, and
@@ -43,6 +46,8 @@ class AsyncLoggerDisruptor extends AbstractLifeCycle {
     private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
     private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
 
+    private final Object queueFullEnqueueLock = new Object();
+
     private volatile Disruptor<RingBufferLogEvent> disruptor;
     private String contextName;
 
@@ -202,32 +207,91 @@ class AsyncLoggerDisruptor extends AbstractLifeCycle {
         return false;
     }
 
-    public boolean tryPublish(final RingBufferLogEventTranslator translator) {
+    boolean tryPublish(final RingBufferLogEventTranslator translator) {
         try {
+            // Note: we deliberately access the volatile disruptor field afresh here.
+            // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
+            // was shut down, which could cause the publishEvent method to hang and never return.
             return disruptor.getRingBuffer().tryPublishEvent(translator);
         } catch (final NullPointerException npe) {
             // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
-            LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}", contextName,
-                    translator.level, translator.loggerName, translator.message.getFormattedMessage()
-                            + (translator.thrown == null ? "" : Throwables.toStringList(translator.thrown)));
+            logWarningOnNpeFromDisruptorPublish(translator);
             return false;
         }
     }
 
-    void enqueueLogMessageInfo(final RingBufferLogEventTranslator translator) {
+    void enqueueLogMessageQueueFullBlocking(final RingBufferLogEventTranslator translator) {
+        try {
+            // Note: we deliberately access the volatile disruptor field afresh here.
+            // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
+            // was shut down, which could cause the publishEvent method to hang and never return.
+            if (synchronizeEnqueueWhenQueueFull()) {
+                synchronized (queueFullEnqueueLock) {
+                    disruptor.publishEvent(translator);
+                }
+            } else {
+                disruptor.publishEvent(translator);
+            }
+        } catch (final NullPointerException npe) {
+            // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
+            logWarningOnNpeFromDisruptorPublish(translator);
+        }
+    }
+
+    void enqueueLogMessageQueueFullBlocking(
+            final EventTranslatorVararg<RingBufferLogEvent> translator,
+            final AsyncLogger asyncLogger,
+            final StackTraceElement location,
+            final String fqcn,
+            final Level level,
+            final Marker marker,
+            final Message msg,
+            final Throwable thrown) {
         try {
             // Note: we deliberately access the volatile disruptor field afresh here.
             // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
             // was shut down, which could cause the publishEvent method to hang and never return.
-            disruptor.publishEvent(translator);
+            if (synchronizeEnqueueWhenQueueFull()) {
+                synchronized (queueFullEnqueueLock) {
+                    disruptor.getRingBuffer().publishEvent(translator,
+                            asyncLogger, // asyncLogger: 0
+                            location, // location: 1
+                            fqcn, // 2
+                            level, // 3
+                            marker, // 4
+                            msg, // 5
+                            thrown); // 6
+                }
+            } else {
+                disruptor.getRingBuffer().publishEvent(translator,
+                        asyncLogger, // asyncLogger: 0
+                        location, // location: 1
+                        fqcn, // 2
+                        level, // 3
+                        marker, // 4
+                        msg, // 5
+                        thrown); // 6
+            }
         } catch (final NullPointerException npe) {
             // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
             LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}", contextName,
-                    translator.level, translator.loggerName, translator.message.getFormattedMessage()
-                            + (translator.thrown == null ? "" : Throwables.toStringList(translator.thrown)));
+                    level, fqcn, msg.getFormattedMessage()
+                            + (thrown == null ? "" : Throwables.toStringList(thrown)));
         }
     }
 
+    private boolean synchronizeEnqueueWhenQueueFull() {
+        return DisruptorUtil.SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_IS_FULL
+                // Background thread must never block
+                && backgroundThreadId != Thread.currentThread().getId();
+    }
+
+    private void logWarningOnNpeFromDisruptorPublish(final RingBufferLogEventTranslator translator) {
+        LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}", contextName,
+                translator.level, translator.loggerName, translator.message.getFormattedMessage()
+                        + (translator.thrown == null ? "" : Throwables.toStringList(translator.thrown)));
+    }
+
     /**
      * Returns whether it is allowed to store non-JDK classes in ThreadLocal objects for efficiency.
      *
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorUtil.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorUtil.java
index 9e946f9..59d2a7e 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorUtil.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorUtil.java
@@ -46,6 +46,11 @@ final class DisruptorUtil {
     private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
     private static final int RINGBUFFER_NO_GC_DEFAULT_SIZE = 4 * 1024;
 
+    // LOG4J2-2606: Disruptor spins enqueuing events across multiple threads when the queue is full.
+    // CPU utilization is significantly reduced by restricting access to the enqueue operation.
+    static final boolean SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_IS_FULL = PropertiesUtil.getProperties()
+            .getBooleanProperty("AsyncLogger.SynchronizeEnqueueWhenQueueFull", true);
+
     private DisruptorUtil() {
     }
 
diff --git a/log4j-perf/src/main/java/org/apache/logging/log4j/perf/jmh/ConcurrentAsyncLoggerToFileBenchmark.java b/log4j-perf/src/main/java/org/apache/logging/log4j/perf/jmh/ConcurrentAsyncLoggerToFileBenchmark.java
index f765437..f6faad0 100644
--- a/log4j-perf/src/main/java/org/apache/logging/log4j/perf/jmh/ConcurrentAsyncLoggerToFileBenchmark.java
+++ b/log4j-perf/src/main/java/org/apache/logging/log4j/perf/jmh/ConcurrentAsyncLoggerToFileBenchmark.java
@@ -39,6 +39,9 @@ import org.openjdk.jmh.annotations.Threads;
 import org.openjdk.jmh.annotations.Warmup;
 
 import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -71,7 +74,7 @@ public class ConcurrentAsyncLoggerToFileBenchmark {
     @State(Scope.Benchmark)
     public static class BenchmarkState {
 
-        @Param({"ENQUEUE", "SYNCHRONOUS"})
+        @Param({"ENQUEUE", "ENQUEUE_UNSYNCHRONIZED", "SYNCHRONOUS"})
         private QueueFullPolicy queueFullPolicy;
 
         private Logger logger;
@@ -95,17 +98,25 @@ public class ConcurrentAsyncLoggerToFileBenchmark {
 
         @SuppressWarnings("unused") // Used by JMH
         public enum QueueFullPolicy {
-            ENQUEUE("Default"),
-            SYNCHRONOUS(SynchronousAsyncQueueFullPolicy.class.getName());
+            ENQUEUE(Collections.singletonMap("log4j2.AsyncQueueFullPolicy", "Default")),
+            ENQUEUE_UNSYNCHRONIZED(new HashMap<>() {{
+                put("log4j2.AsyncQueueFullPolicy", "Default");
+                put("log4j2.AsyncLogger.SynchronizeEnqueueWhenQueueFull", "false");
+            }
+            }),
+            SYNCHRONOUS(Collections.singletonMap("log4j2.AsyncQueueFullPolicy",
+                    SynchronousAsyncQueueFullPolicy.class.getName()));
 
-            private final String queueFullPolicy;
+            private final Map<String, String> properties;
 
-            QueueFullPolicy(String queueFullPolicy) {
-                this.queueFullPolicy = queueFullPolicy;
+            QueueFullPolicy(Map<String, String> properties) {
+                this.properties = properties;
             }
 
             void setProperties() {
-                System.setProperty("log4j2.AsyncQueueFullPolicy", queueFullPolicy);
+                for (Map.Entry<String, String> entry : properties.entrySet()) {
+                    System.setProperty(entry.getKey(), entry.getValue());
+                }
             }
         }
     }
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 53c28d4..ca50ffb 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -436,6 +436,9 @@
       <action issue="LOG4J2-2611" dev="ckozak" type="add">
         AsyncQueueFullPolicy configuration short values "Default" and "Discard" are case insensitive to avoid confusion.
       </action>
+      <action issue="LOG4J2-2606" dev="ckozak" type="fix">
+        Asynchronous logging when the queue is full no longer results in heavy CPU utilization and low throughput.
+      </action>
     </release>
     <release version="2.11.2" date="2018-MM-DD" description="GA Release 2.11.2">
       <action issue="LOG4J2-2500" dev="rgoers" type="fix">