You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2022/08/25 14:11:24 UTC

[tomcat] branch 8.5.x updated: Switch to Executor for async logging

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/8.5.x by this push:
     new 702a7cde41 Switch to Executor for async logging
702a7cde41 is described below

commit 702a7cde417038948b066e18b80c5455ddc2b863
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Thu Aug 25 14:29:19 2022 +0100

    Switch to Executor for async logging
    
    Replace logging thread for JULI's AsyncFileHandlerwith an executor to
    protect against failure of the logging thread.
    Based on pull request #545 by Piotr P. Karwasz.
---
 java/org/apache/juli/AsyncFileHandler.java         | 174 +++++++++++----------
 java/org/apache/juli/FileHandler.java              |  97 +++++-------
 java/org/apache/juli/OneLineFormatter.java         |   5 +-
 .../apache/juli/TestAsyncFileHandlerOverflow.java  | 150 ++++++++++++++++++
 webapps/docs/changelog.xml                         |   5 +
 5 files changed, 289 insertions(+), 142 deletions(-)

diff --git a/java/org/apache/juli/AsyncFileHandler.java b/java/org/apache/juli/AsyncFileHandler.java
index 32b0809c96..0d74079ec9 100644
--- a/java/org/apache/juli/AsyncFileHandler.java
+++ b/java/org/apache/juli/AsyncFileHandler.java
@@ -17,9 +17,13 @@
 package org.apache.juli;
 
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.LogRecord;
+
 /**
  * A {@link FileHandler} implementation that uses a queue of log entries.
  *
@@ -39,6 +43,8 @@ import java.util.logging.LogRecord;
  */
 public class AsyncFileHandler extends FileHandler {
 
+    static final String THREAD_PREFIX = "AsyncFileHandlerWriter-";
+
     public static final int OVERFLOW_DROP_LAST    = 1;
     public static final int OVERFLOW_DROP_FIRST   = 2;
     public static final int OVERFLOW_DROP_FLUSH   = 3;
@@ -54,17 +60,12 @@ public class AsyncFileHandler extends FileHandler {
             System.getProperty("org.apache.juli.AsyncMaxRecordCount",
                                Integer.toString(DEFAULT_MAX_RECORDS)));
 
-    protected static final LinkedBlockingDeque<LogEntry> queue =
-            new LinkedBlockingDeque<>(MAX_RECORDS);
-
-    protected static final LoggerThread logger = new LoggerThread();
-
-    static {
-        logger.start();
-    }
+    private static final LoggerExecutorService LOGGER_SERVICE =
+            new LoggerExecutorService(OVERFLOW_DROP_TYPE, MAX_RECORDS);
 
     private final Object closeLock = new Object();
     protected volatile boolean closed = false;
+    private final LoggerExecutorService loggerService;
 
     public AsyncFileHandler() {
         this(null, null, null, DEFAULT_MAX_DAYS);
@@ -75,7 +76,13 @@ public class AsyncFileHandler extends FileHandler {
     }
 
     public AsyncFileHandler(String directory, String prefix, String suffix, int maxDays) {
+        this(directory, prefix, suffix, maxDays, LOGGER_SERVICE);
+    }
+
+    AsyncFileHandler(String directory, String prefix, String suffix, int maxDays,
+            LoggerExecutorService loggerService) {
         super(directory, prefix, suffix, maxDays);
+        this.loggerService = loggerService;
         open();
     }
 
@@ -90,7 +97,7 @@ public class AsyncFileHandler extends FileHandler {
             }
             closed = true;
         }
-        LoggerThread.deregisterHandler();
+        loggerService.deregisterHandler();
         super.close();
     }
 
@@ -105,71 +112,80 @@ public class AsyncFileHandler extends FileHandler {
             }
             closed = false;
         }
-        LoggerThread.registerHandler();
+        loggerService.registerHandler();
         super.open();
     }
 
-
     @Override
-    public void publish(LogRecord record) {
+    public void publish(final LogRecord record) {
         if (!isLoggable(record)) {
             return;
         }
         // fill source entries, before we hand the record over to another
         // thread with another class loader
         record.getSourceMethodName();
-        LogEntry entry = new LogEntry(record, this);
-        boolean added = false;
-        try {
-            while (!added && !queue.offer(entry)) {
-                switch (OVERFLOW_DROP_TYPE) {
-                    case OVERFLOW_DROP_LAST: {
-                        //remove the last added element
-                        queue.pollLast();
-                        break;
-                    }
-                    case OVERFLOW_DROP_FIRST: {
-                        //remove the first element in the queue
-                        queue.pollFirst();
-                        break;
-                    }
-                    case OVERFLOW_DROP_FLUSH: {
-                        added = queue.offer(entry, 1000, TimeUnit.MILLISECONDS);
-                        break;
-                    }
-                    case OVERFLOW_DROP_CURRENT: {
-                        added = true;
-                        break;
-                    }
-                }//switch
-            }//while
-        } catch (InterruptedException x) {
-            // Allow thread to be interrupted and back out of the publish
-            // operation. No further action required.
-        }
-
+        loggerService.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                /*
+                 * During Tomcat shutdown, the Handlers are closed before the
+                 * executor queue is flushed therefore the closed flag is
+                 * ignored if the executor is shutting down.
+                 */
+                if (!closed || loggerService.isTerminating()) {
+                    publishInternal(record);
+                }
+            }
+        });
     }
 
     protected void publishInternal(LogRecord record) {
         super.publish(record);
     }
 
-    protected static class LoggerThread extends Thread {
+
+    static class LoggerExecutorService extends ThreadPoolExecutor {
+
+        private static final ThreadFactory THREAD_FACTORY = new ThreadFactory(THREAD_PREFIX);
 
         /*
          * Implementation note: Use of this count could be extended to
-         * start/stop the LoggerThread but that would require careful locking as
-         * the current size of the queue also needs to be taken into account and
-         * there are lost of edge cases when rapidly starting and stopping
-         * handlers.
+         * start/stop the LoggerExecutorService but that would require careful
+         * locking as the current size of the queue also needs to be taken into
+         * account and there are lost of edge cases when rapidly starting and
+         * stopping handlers.
          */
-        private static final AtomicInteger handlerCount = new AtomicInteger();
+        private final AtomicInteger handlerCount = new AtomicInteger();
+
+        public LoggerExecutorService(final int overflowDropType, final int maxRecords) {
+            super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(maxRecords), THREAD_FACTORY);
+            switch (overflowDropType) {
+                case OVERFLOW_DROP_LAST:
+                default:
+                    setRejectedExecutionHandler(new DropLastPolicy());
+                    break;
+                case OVERFLOW_DROP_FIRST:
+                    setRejectedExecutionHandler(new DiscardOldestPolicy());
+                    break;
+                case OVERFLOW_DROP_FLUSH:
+                    setRejectedExecutionHandler(new DropFlushPolicy());
+                    break;
+                case OVERFLOW_DROP_CURRENT:
+                    setRejectedExecutionHandler(new DiscardPolicy());
+            }
+        }
+
+        @Override
+        public LinkedBlockingDeque<Runnable> getQueue() {
+            return (LinkedBlockingDeque<Runnable>) super.getQueue();
+        }
 
-        public static void registerHandler() {
+        public void registerHandler() {
             handlerCount.incrementAndGet();
         }
 
-        public static void deregisterHandler() {
+        public void deregisterHandler() {
             int newCount = handlerCount.decrementAndGet();
             if (newCount == 0) {
                 try {
@@ -179,54 +195,46 @@ public class AsyncFileHandler extends FileHandler {
                 } catch (IllegalStateException ise) {
                     // JVM is shutting down.
                     // Allow up to 10s for for the queue to be emptied
-                    int sleepCount = 0;
-                    while (!AsyncFileHandler.queue.isEmpty() && sleepCount < 10000) {
-                        try {
-                            Thread.sleep(1);
-                        } catch (InterruptedException e) {
-                            // Ignore
-                        }
-                        sleepCount++;
+                    shutdown();
+                    try {
+                        awaitTermination(10, TimeUnit.SECONDS);
+                    } catch (InterruptedException e) {
+                        // Ignore
                     }
+                    shutdownNow();
                 }
             }
         }
+    }
 
-        public LoggerThread() {
-            this.setDaemon(true);
-            this.setName("AsyncFileHandlerWriter-" + System.identityHashCode(this));
-        }
+
+    private static class DropFlushPolicy implements RejectedExecutionHandler {
 
         @Override
-        public void run() {
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
             while (true) {
+                if (executor.isShutdown()) {
+                    break;
+                }
                 try {
-                    LogEntry entry = queue.take();
-                    entry.flush();
-                } catch (InterruptedException x) {
-                    // Ignore the attempt to interrupt the thread.
-                } catch (Exception x) {
-                    x.printStackTrace();
+                    if (executor.getQueue().offer(r, 1000, TimeUnit.MILLISECONDS)) {
+                        break;
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RejectedExecutionException("Interrupted", e);
                 }
             }
         }
     }
 
-    protected static class LogEntry {
-        private final LogRecord record;
-        private final AsyncFileHandler handler;
-        public LogEntry(LogRecord record, AsyncFileHandler handler) {
-            super();
-            this.record = record;
-            this.handler = handler;
-        }
+    private static class DropLastPolicy implements RejectedExecutionHandler {
 
-        public boolean flush() {
-            if (handler.closed) {
-                return false;
-            } else {
-                handler.publishInternal(record);
-                return true;
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+            if (!executor.isShutdown()) {
+                ((LoggerExecutorService) executor).getQueue().pollLast();
+                executor.execute(r);
             }
         }
     }
diff --git a/java/org/apache/juli/FileHandler.java b/java/org/apache/juli/FileHandler.java
index 7e3cfc8ddc..ef5543af71 100644
--- a/java/org/apache/juli/FileHandler.java
+++ b/java/org/apache/juli/FileHandler.java
@@ -37,7 +37,6 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -95,65 +94,11 @@ import java.util.regex.Pattern;
  */
 public class FileHandler extends Handler {
 
+
     public static final int DEFAULT_MAX_DAYS = -1;
 
     private static final ExecutorService DELETE_FILES_SERVICE =
-            Executors.newSingleThreadExecutor(new ThreadFactory() {
-                private static final String NAME_PREFIX = "FileHandlerLogFilesCleaner-";
-                private final boolean isSecurityEnabled;
-                private final ThreadGroup group;
-                private final AtomicInteger threadNumber = new AtomicInteger(1);
-
-                {
-                    SecurityManager s = System.getSecurityManager();
-                    if (s == null) {
-                        this.isSecurityEnabled = false;
-                        this.group = Thread.currentThread().getThreadGroup();
-                    } else {
-                        this.isSecurityEnabled = true;
-                        this.group = s.getThreadGroup();
-                    }
-                }
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    final ClassLoader loader = Thread.currentThread().getContextClassLoader();
-                    try {
-                        // Threads should not be created by the webapp classloader
-                        if (isSecurityEnabled) {
-                            AccessController.doPrivileged(new PrivilegedAction<Void>() {
-
-                                @Override
-                                public Void run() {
-                                    Thread.currentThread()
-                                            .setContextClassLoader(getClass().getClassLoader());
-                                    return null;
-                                }
-                            });
-                        } else {
-                            Thread.currentThread()
-                                    .setContextClassLoader(getClass().getClassLoader());
-                        }
-                        Thread t = new Thread(group, r,
-                                NAME_PREFIX + threadNumber.getAndIncrement());
-                        t.setDaemon(true);
-                        return t;
-                    } finally {
-                        if (isSecurityEnabled) {
-                            AccessController.doPrivileged(new PrivilegedAction<Void>() {
-
-                                @Override
-                                public Void run() {
-                                    Thread.currentThread().setContextClassLoader(loader);
-                                    return null;
-                                }
-                            });
-                        } else {
-                            Thread.currentThread().setContextClassLoader(loader);
-                        }
-                    }
-                }
-            });
+            Executors.newSingleThreadExecutor(new ThreadFactory("FileHandlerLogFilesCleaner-"));
 
     // ------------------------------------------------------------ Constructor
 
@@ -597,4 +542,42 @@ public class FileHandler extends Handler {
         cal.add(Calendar.DATE, -maxDays);
         return cal.getTime();
     }
+
+    protected static final class ThreadFactory implements java.util.concurrent.ThreadFactory {
+        private final String namePrefix;
+        private final boolean isSecurityEnabled;
+        private final ThreadGroup group;
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+        public ThreadFactory(final String namePrefix) {
+            this.namePrefix = namePrefix;
+            SecurityManager s = System.getSecurityManager();
+            if (s == null) {
+                this.isSecurityEnabled = false;
+                this.group = Thread.currentThread().getThreadGroup();
+            } else {
+                this.isSecurityEnabled = true;
+                this.group = s.getThreadGroup();
+            }
+        }
+
+        @Override
+        public Thread newThread(Runnable r) {
+            final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
+            // Threads should not have as context classloader a webapp classloader
+            if (isSecurityEnabled) {
+                AccessController.doPrivileged(new PrivilegedAction<Void>() {
+                    @Override
+                    public Void run() {
+                        t.setContextClassLoader(ThreadFactory.class.getClassLoader());
+                        return null;
+                    }
+                });
+            } else {
+                t.setContextClassLoader(ThreadFactory.class.getClassLoader());
+            }
+            t.setDaemon(true);
+            return t;
+        }
+    }
 }
diff --git a/java/org/apache/juli/OneLineFormatter.java b/java/org/apache/juli/OneLineFormatter.java
index 8c663c83d7..e481283356 100644
--- a/java/org/apache/juli/OneLineFormatter.java
+++ b/java/org/apache/juli/OneLineFormatter.java
@@ -141,12 +141,13 @@ public class OneLineFormatter extends Formatter {
         // Thread
         sb.append(' ');
         sb.append('[');
-        if (Thread.currentThread() instanceof AsyncFileHandler.LoggerThread) {
+        final String threadName = Thread.currentThread().getName();
+        if (threadName != null && threadName.startsWith(AsyncFileHandler.THREAD_PREFIX)) {
             // If using the async handler can't get the thread name from the
             // current thread.
             sb.append(getThreadName(record.getThreadID()));
         } else {
-            sb.append(Thread.currentThread().getName());
+            sb.append(threadName);
         }
         sb.append(']');
 
diff --git a/test/org/apache/juli/TestAsyncFileHandlerOverflow.java b/test/org/apache/juli/TestAsyncFileHandlerOverflow.java
new file mode 100644
index 0000000000..8bb8903af4
--- /dev/null
+++ b/test/org/apache/juli/TestAsyncFileHandlerOverflow.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.juli;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Formatter;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.juli.AsyncFileHandler.LoggerExecutorService;
+
+@RunWith(Parameterized.class)
+public class TestAsyncFileHandlerOverflow {
+
+    private static final String PREFIX = "TestAsyncFileHandler.";
+    private static final String SUFFIX = ".log";
+    private static final Logger logger = Logger.getLogger(TestAsyncFileHandlerOverflow.class.getName());
+    {
+        logger.setUseParentHandlers(false);
+    }
+
+    @Parameters
+    public static Collection<Object[]> parameters() {
+        return Arrays.asList(new Object[][] {
+                { Integer.valueOf(AsyncFileHandler.OVERFLOW_DROP_LAST), "START\n1\n3\n" },
+                { Integer.valueOf(AsyncFileHandler.OVERFLOW_DROP_FIRST), "START\n2\n3\n" },
+                { Integer.valueOf(AsyncFileHandler.OVERFLOW_DROP_FLUSH), "START\n1\n2\n3\n" },
+                { Integer.valueOf(AsyncFileHandler.OVERFLOW_DROP_CURRENT), "START\n1\n2\n" } });
+    }
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+    private Path logsDir;
+    private LoggerExecutorService loggerService;
+    private AsyncFileHandler handler;
+
+    private final int overflowDropType;
+    private final String expected;
+
+    public TestAsyncFileHandlerOverflow(final int overflowDropType, final String expected) {
+        this.overflowDropType = overflowDropType;
+        this.expected = expected;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        final Path logsBase = Paths.get(System.getProperty("tomcat.test.temp", "output/tmp"));
+        Files.createDirectories(logsBase);
+        this.logsDir = Files.createTempDirectory(logsBase, "test");
+        final Formatter formatter = new Formatter() {
+
+            @Override
+            public String format(LogRecord record) {
+                return record.getMessage() + "\n";
+            }
+        };
+        // Setup an executor that blocks until the first rejection
+        this.loggerService = new LoggerExecutorService(overflowDropType, 2) {
+
+            @Override
+            protected void beforeExecute(Thread t, Runnable r) {
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                super.beforeExecute(t, r);
+            }
+        };
+        final RejectedExecutionHandler rejectionHandler = loggerService.getRejectedExecutionHandler();
+        loggerService.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+
+            @Override
+            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                latch.countDown();
+                rejectionHandler.rejectedExecution(r, executor);
+            }
+        });
+        this.handler = new AsyncFileHandler(logsDir.toString(), PREFIX, SUFFIX, 1, loggerService);
+        handler.setFormatter(formatter);
+        logger.addHandler(handler);
+        handler.open();
+    }
+
+    @After
+    public void cleanUp() {
+        handler.close();
+        logger.removeHandler(handler);
+    }
+
+    @Test
+    public void testOverFlow() throws IOException, InterruptedException {
+        handler.open();
+        logger.warning("START"); // blocks async thread
+        // these are queued
+        logger.warning("1");
+        logger.warning("2");
+        logger.warning("3"); // overflows executor and unblocks aync thread
+        loggerService.shutdown();
+        // after shutdown was issued
+        logger.warning("IGNORE");
+
+        loggerService.awaitTermination(1, TimeUnit.SECONDS);
+        final ByteArrayOutputStream os = new ByteArrayOutputStream();
+        Files.copy(logsDir.resolve(PREFIX + getLocalDate() + SUFFIX), os);
+        final String actual = new String(os.toByteArray(), StandardCharsets.UTF_8);
+        Assert.assertEquals(expected, actual);
+        handler.close();
+    }
+
+    private static String getLocalDate() {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd", Locale.getDefault());
+        return sdf.format(new Date(System.currentTimeMillis()));
+    }
+}
\ No newline at end of file
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index ad3cf21011..a4620c310e 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -137,6 +137,11 @@
         contain the duplicates. Based on pull request <pr>535</pr> by Mads
         Rolsdorph. (markt)
       </fix>
+      <fix>
+        Replace logging thread for JULI's <code>AsyncFileHandler</code> with an
+        executor to protect against failure of the logging thread. Based on pull
+        request <pr>545</pr> by Piotr P. Karwasz. (markt)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Coyote">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org