You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rp...@apache.org on 2015/12/23 14:38:45 UTC

[06/11] logging-log4j2 git commit: LOG4J2-1080 AsyncAppender delegates to the AsyncEventRouter to determine if/how to log the event at the current queue usage

LOG4J2-1080 AsyncAppender delegates to the AsyncEventRouter to determine if/how to log the event at the current queue usage


Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/1e6167b7
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/1e6167b7
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/1e6167b7

Branch: refs/heads/master
Commit: 1e6167b7f36affc787ce629beb9c9813628e2a3b
Parents: d48b2b7
Author: rpopma <rp...@apache.org>
Authored: Wed Dec 23 22:12:08 2015 +0900
Committer: rpopma <rp...@apache.org>
Committed: Wed Dec 23 22:12:08 2015 +0900

----------------------------------------------------------------------
 .../log4j/core/appender/AsyncAppender.java      | 115 ++++++++++++-------
 1 file changed, 74 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/1e6167b7/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java
index 5dbf8de..a096b58 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java
@@ -24,9 +24,14 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.Appender;
 import org.apache.logging.log4j.core.Filter;
 import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.async.AsyncEventRouter;
+import org.apache.logging.log4j.core.async.AsyncEventRouterFactory;
+import org.apache.logging.log4j.core.async.DiscardingAsyncEventRouter;
+import org.apache.logging.log4j.core.async.EventRoute;
 import org.apache.logging.log4j.core.async.RingBufferLogEvent;
 import org.apache.logging.log4j.core.config.AppenderControl;
 import org.apache.logging.log4j.core.config.AppenderRef;
@@ -54,7 +59,6 @@ public final class AsyncAppender extends AbstractAppender {
     private static final String SHUTDOWN = "Shutdown";
 
     private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
-    private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<>();
 
     private final BlockingQueue<Serializable> queue;
     private final int queueSize;
@@ -66,6 +70,7 @@ public final class AsyncAppender extends AbstractAppender {
     private final boolean includeLocation;
     private AppenderControl errorAppender;
     private AsyncThread thread;
+    private AsyncEventRouter asyncEventRouter;
 
     private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
             final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
@@ -107,6 +112,7 @@ public final class AsyncAppender extends AbstractAppender {
         } else if (errorRef == null) {
             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
         }
+        asyncEventRouter = AsyncEventRouterFactory.create(queueSize);
 
         thread.start();
         super.start();
@@ -123,11 +129,16 @@ public final class AsyncAppender extends AbstractAppender {
             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
         }
         LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
+
+        if (DiscardingAsyncEventRouter.getDiscardCount(asyncEventRouter) > 0) {
+            LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncEventRouter,
+                    DiscardingAsyncEventRouter.getDiscardCount(asyncEventRouter));
+        }
     }
 
     /**
      * Actual writing occurs here.
-     * 
+     *
      * @param logEvent The LogEvent.
      */
     @Override
@@ -145,54 +156,77 @@ public final class AsyncAppender extends AbstractAppender {
             logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
         }
         final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
+        logEvent(coreEvent);
+    }
+
+    private void logEvent(final Log4jLogEvent logEvent) {
+        final Level logLevel = logEvent.getLevel();
+        final int remainingCapacity = getQueueRemainingCapacity();
+        final EventRoute route = asyncEventRouter.getRoute(thread.getId(), logLevel, queueSize, remainingCapacity);
+        route.logMessage(this, logEvent);
+    }
+
+    /**
+     * FOR INTERNAL USE ONLY.
+     *
+     * @param logEvent the event to log
+     */
+    public void logMessageInCurrentThread(final Log4jLogEvent logEvent) {
+        logEvent.setEndOfBatch(queue.isEmpty());
+        final boolean appendSuccessful = thread.callAppenders(logEvent);
+        logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
+    }
+
+    /**
+     * FOR INTERNAL USE ONLY.
+     *
+     * @param logEvent the event to log
+     */
+    public void logMessageInBackgroundThread(final Log4jLogEvent logEvent) {
         boolean appendSuccessful = false;
         if (blocking) {
-            if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
-                // LOG4J2-485: avoid deadlock that would result from trying
-                // to add to a full queue from appender thread
-                coreEvent.setEndOfBatch(false); // queue is definitely not empty!
-                appendSuccessful = thread.callAppenders(coreEvent);
-            } else {
-                final Serializable serialized = Log4jLogEvent.serialize(coreEvent, includeLocation);
-                try {
-                    // wait for free slots in the queue
-                    queue.put(serialized);
-                    appendSuccessful = true;
-                } catch (final InterruptedException e) {
-                    // LOG4J2-1049: Some applications use Thread.interrupt() to send
-                    // messages between application threads. This does not necessarily
-                    // mean that the queue is full. To prevent dropping a log message,
-                    // quickly try to offer the event to the queue again.
-                    // (Yes, this means there is a possibility the same event is logged twice.)
-                    //
-                    // Finally, catching the InterruptedException means the
-                    // interrupted flag has been cleared on the current thread.
-                    // This may interfere with the application's expectation of
-                    // being interrupted, so when we are done, we set the interrupted
-                    // flag again.
-                    appendSuccessful = queue.offer(serialized);
-                    if (!appendSuccessful) {
-                        LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
-                                getName());
-                    }
-                    // set the interrupted flag again.
-                    Thread.currentThread().interrupt();
+            final Serializable serialized = Log4jLogEvent.serialize(logEvent, includeLocation);
+            try {
+                // wait for free slots in the queue
+                queue.put(serialized);
+                appendSuccessful = true;
+            } catch (final InterruptedException e) {
+                // LOG4J2-1049: Some applications use Thread.interrupt() to send
+                // messages between application threads. This does not necessarily
+                // mean that the queue is full. To prevent dropping a log message,
+                // quickly try to offer the event to the queue again.
+                // (Yes, this means there is a possibility the same event is logged twice.)
+                //
+                // Finally, catching the InterruptedException means the
+                // interrupted flag has been cleared on the current thread.
+                // This may interfere with the application's expectation of
+                // being interrupted, so when we are done, we set the interrupted
+                // flag again.
+                appendSuccessful = queue.offer(serialized);
+                if (!appendSuccessful) {
+                    LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
+                            getName());
                 }
+                // set the interrupted flag again.
+                Thread.currentThread().interrupt();
             }
         } else {
-            appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
+            appendSuccessful = queue.offer(Log4jLogEvent.serialize(logEvent, includeLocation));
             if (!appendSuccessful) {
                 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
             }
         }
+        logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
+    }
+
+    private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final Log4jLogEvent logEvent) {
         if (!appendSuccessful && errorAppender != null) {
-            errorAppender.callAppender(coreEvent);
+            errorAppender.callAppender(logEvent);
         }
     }
-
     /**
      * Create an AsyncAppender.
-     * 
+     *
      * @param appenderRefs The Appenders to reference.
      * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
      * @param blocking True if the Appender should wait when the queue is full. The default is true.
@@ -247,7 +281,6 @@ public final class AsyncAppender extends AbstractAppender {
 
         @Override
         public void run() {
-            isAppenderThread.set(Boolean.TRUE); // LOG4J2-485
             while (!shutdown) {
                 Serializable s;
                 try {
@@ -327,7 +360,7 @@ public final class AsyncAppender extends AbstractAppender {
 
     /**
      * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
-     * 
+     *
      * @return the names of the sink appenders
      */
     public String[] getAppenderRefStrings() {
@@ -341,7 +374,7 @@ public final class AsyncAppender extends AbstractAppender {
     /**
      * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
      * the class and method where the logging call was made.
-     * 
+     *
      * @return {@code true} if location is included with every event, {@code false} otherwise
      */
     public boolean isIncludeLocation() {
@@ -351,7 +384,7 @@ public final class AsyncAppender extends AbstractAppender {
     /**
      * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
      * dropped when the queue is full.
-     * 
+     *
      * @return whether this AsyncAppender will block or drop events when the queue is full.
      */
     public boolean isBlocking() {
@@ -360,7 +393,7 @@ public final class AsyncAppender extends AbstractAppender {
 
     /**
      * Returns the name of the appender that any errors are logged to or {@code null}.
-     * 
+     *
      * @return the name of the appender that any errors are logged to or {@code null}
      */
     public String getErrorRef() {