You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@logging.apache.org by GitBox <gi...@apache.org> on 2020/12/28 22:21:11 UTC

[GitHub] [logging-log4j2] vy opened a new pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

vy opened a new pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553355305



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);

Review comment:
       I believe this approach avoids iterative allocation, with the constraint that it requires a RandomAccess list implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553355305



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);

Review comment:
       I believe this approach avoids iterator allocation, with the constraint that it requires a RandomAccess list implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] garydgregory commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
garydgregory commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553420888



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       I am not a fan of following Spring's naming, we need to pick names that make sense FOR LOG4J ;-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554049278



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {

Review comment:
       I defer to your judgement whether or not to make a change here, this was meant to elaborate upon the comment and complexities of the scenario. We can make changes separately from this PR even if we decide the behavior should be different :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554011768



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void dispatch(final LogEvent event) {
+
+        // Dispatch the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable error) {
+                // If no appender is successful, the error appender will get it.
+                // It is okay to simply log it here.
+                if (LOGGER.isTraceEnabled()) {
+                    final String message = String.format(
+                            "%s has failed to call appender %s",
+                            getName(), control.getAppenderName());
+                    LOGGER.trace(message, error);

Review comment:
       Here I try to match against the `trace(..., Throwable)` method. How can I employ *interpolation curly braces* to achieve that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553416530



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {

Review comment:
       I think we can replace this synchronization with an atomic getAndSet (since there are only two possible values) or compareAndSet using AtomicBoolean or an AtomicIntegerFieldUpdater (flat object layout, avoids pointer indirection).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553423896



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));
+
+        // Interrupt the thread. (Note that there is neither a check on
+        // "stopped", nor a synchronization here. It is okay to interrupt
+        // concurrently and/or multiple times.)
+        interrupt();

Review comment:
       So the previous implementation would submit a sentinel event through the queue to wake the background thread instead of relying on thread interruption.
   I think we should avoid interrupting the thread if at all possible due to the subtleties of java interruption. Interruption will actually close sockets if any blocking operations are in progress! This means a socket appender may surprisingly fail to deliver final events. I recall some oddities with file i/o as well.
   
   Perhaps we could try to enqueue a sentinel event and fall back to interruption if there's no space in the queue?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553354888



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       +1 I don't think forward is correct here or in the class name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553432581



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forward(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.

Review comment:
       Unsure what the behavior was before, but I think it's worth a statuslogger debug line here :-)

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forward(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.

Review comment:
       might as well record the throwable here as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553998488



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forward(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553417392



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       You reasoning makes perfect sense. I wish I would be able to answer your question. The only thing I know is, without this line, `stop()` blocks sometimes. Maybe something for you investigate during your thread-safety check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553996990



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));
+
+        // Interrupt the thread. (Note that there is neither a check on
+        // "stopped", nor a synchronization here. It is okay to interrupt
+        // concurrently and/or multiple times.)
+        interrupt();

Review comment:
       Done.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forward(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.

Review comment:
       Done.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forward(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.

Review comment:
       Done.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forward(final LogEvent event) {

Review comment:
       I will kindly pass this.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       @carterkozak, I guess we are sort of done, right? This `while` block discussion is the only one left there. I don't have a strong opinion here. How shall we proceed?

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       Renamed the class to `AsyncAppenderEventDispatcher`.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void dispatch(final LogEvent event) {
+
+        // Dispatch the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable error) {
+                // If no appender is successful, the error appender will get it.
+                // It is okay to simply log it here.
+                if (LOGGER.isTraceEnabled()) {
+                    final String message = String.format(
+                            "%s has failed to call appender %s",
+                            getName(), control.getAppenderName());
+                    LOGGER.trace(message, error);

Review comment:
       Here I try to match against the `trace(..., Throwable)` method. How can I employ *interpolation curly braces* to achieve that?

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {

Review comment:
       Shouldn't we simply discard anything that is sent after the `stop()` signal?

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void dispatch(final LogEvent event) {
+
+        // Dispatch the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable error) {
+                // If no appender is successful, the error appender will get it.
+                // It is okay to simply log it here.
+                if (LOGGER.isTraceEnabled()) {
+                    final String message = String.format(
+                            "%s has failed to call appender %s",
+                            getName(), control.getAppenderName());
+                    LOGGER.trace(message, error);

Review comment:
       Didn't know such a thing existed at all! :open_mouth: Fixed.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       Let's get all done here. I really want to get this merged and poke Ralph for 2.14.1.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {

Review comment:
       I think your argument makes sense. Replace `break` with `continue`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553996990



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));
+
+        // Interrupt the thread. (Note that there is neither a check on
+        // "stopped", nor a synchronization here. It is okay to interrupt
+        // concurrently and/or multiple times.)
+        interrupt();

Review comment:
       Done.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forward(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-756195475


   > A bit of a challenge to follow the bouncing ball ;-) but I would like to confirm/ask that with the new behavior that flushes events on shutdown, that there is no chance of an app even appearing to hang/delay/wait on shutdown... IOW, when I shutdown my app, I want it to go away _now_ (within a reasonable but _very_ short time).
   
   Yes, AFAIC, when `stop()` is called, thread is interrupted to transition from `forwardAll()` to `forwardRemaining()`, where the latter is non-blocking. This is also analogous to the earlier behavior.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554001403



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       I'll take another read through this change today -- happy to defer changes on this part to a separate PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-752257307


   > The downside is when an error is raised, we pay the full cost of thread creation and context switching rather than catching and continuing.
   
   Correct. Though note that this is not worsening the situation compared to 2.13.0 and earlier. Up until 2.13.0, we were catching `RuntimeException` and `Exception`s. In 2.14.0, we have replaced these two catch blocks with a single `Throwable` catch. Then the hell broke loose.
   
   > Can we compare this solution with a potential alternative:
   > The `AppenderControl` is reverted to catch `Exception`, not `Throwable`/`Error`, however `AsyncAppender.AsyncThread.run()` is updated to catch `Throwable` in both `callAppenders` and the `errorAppender.callAppender` try/catch?
   
   How are we gonna handle `ThreadDeath` in this case?
   
   In summary, I am really reluctant to add any kind of `catch (Throwable ...)` clauses.
   
   > I believe this would match how the disruptor-based `AsyncContextSelector` implementation works already.
   
   Any code pointers?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-752744543


   > But once `AsyncAppender` catches the `ThreadDeath`, it is basically dead. Who is gonna restart that thread?
   
   In this case  `ThreadDeath` is a `Throwable` like any other, after we catch it, it's handled and gone. Calling `Thread.stop()` results in that exception being thrown from another thread. The background thread is still both alive and in a normal state (e.g. `RUNNABLE`) so it can continue to process the queue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-753677605


   @carterkozak, after giving it some thought, I have decided to go with your proposal. In a nutshell, what the most recent change set incorporates is as follows:
   
   - `AppenderControl` catch clauses is demoted from `Throwable` to `Exception`.
   - `AsyncAppender.AsyncThread` is moved to `AsyncAppenderEventForwarder` with plenty of refactoring for simplification.
   - On shutdown, `AsyncAppenderEventForwarder` forwards all the remaining `LogEvent`s, whereas `AsyncAppender.AsyncThread` was ignoring non-`Log4LogEvent`s.
   - `AsyncAppenderExceptionHandlingTest` is improved for `ThreadDeath`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-752746126


   I think the `OutOfMemoryError` handling is critical. Consider an application which gets a spike of requests and fills heap (outside of the logging components) however an asynchronous logging event happens to trigger the first `OutOfMemoryError`. In this case we wouldn't want to stop processing the disruptor queue and block the application because that would make things even worse, we prefer to gracefully swallow the failure and provide as much detail as possible (see [AbstractAsyncExceptionHandler.java](https://github.com/apache/logging-log4j2/blob/913e87d7747aa8d90b7bd3236c4dca2e3307336a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/AbstractAsyncExceptionHandler.java)).
   
   In an ideal world everyone would use `-XX:+CrashOnOutOfMemoryError` to avoid thinking about OMEs, but that has drawbacks in some environments as you might imagine :-)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554012957



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {

Review comment:
       Shouldn't we simply discard anything that is sent after the `stop()` signal?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553405665



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       For the records, with `forwardOne()`, I was trying to stick to the Spring Data conventions: `findOne()`, `findAll()`, etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554000528



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       Renamed the class to `AsyncAppenderEventDispatcher`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-752224064


   > I think the only way this can happen is if someone calls [Thread.stop()](https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#stop--), so we must consider the intent when that occurs. Is it accidental, where the asyncappender interaction is collateral damage, or is the system attempting to stop all threads? In the former case we may want to avoid breaking the logger, but in the latter we would not want to restart the background thread.
   
   The use case is reported [here](/apache/logging-log4j2/commit/56436ad2176eac000d2821690e4373f097b76670#r44892412).
   
   > If we're restarting the background thread, why not simply catch ThreadDeath to avoid thread churn and potential bugs that result from the indirection?
   > 
   > Based on the original request when we began catching Error, I don't think the caller was stopping background threads (or using asynchronous logging at all) but rather calling Thread.stop on an application thread that happened to log. Using synchronous logging it would swallow the ThreadDeath error silently and allow the application thread to continue. I think we can allow ThreadDeath to be propagated, and even kill background threads -- if that's what the user wants and the security manager allows it, on their own heads be it!
   
   The major improvement this change set brings is that there are no smart whitelist of failures that are subject to handling. Anything above an `Exception`, we will not catch, which is the best practice while dealing with `Throwable`s. At the beginning, we were just catching `Exception`s. Then we figured there are certain `Error`s (e.g., `ExceptionInInitializerError`) that are harmless. Consequently we extended the catch block to `Throwable`s. Then somebody reported we shouldn't catch `ThreadDeath`. Evidently, we can't oversee all the potential corner cases while dealing with exceptions. Hence, in this patch, I just catch `Exception`s and respawn the thread on unexpected failures. I think, this is a way simpler and future-proof approach.
   
   > Also note that as written this PR only applies to the AsyncAppender, not the disruptor based fully async AsyncLoggerContextSelector or mixed sync/async logging.
   
   If we agree on the strategy employed here, I can extend it to wherever applicable. (The changes need to be duplicated on `master` anyway.)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554001403



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       I'll take another read through this change today -- happy to defer changes on this part to a separate PR.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void dispatch(final LogEvent event) {
+
+        // Dispatch the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable error) {
+                // If no appender is successful, the error appender will get it.
+                // It is okay to simply log it here.
+                if (LOGGER.isTraceEnabled()) {
+                    final String message = String.format(
+                            "%s has failed to call appender %s",
+                            getName(), control.getAppenderName());
+                    LOGGER.trace(message, error);

Review comment:
       nit: I think we can use interpolation curly braces instead of String.format. Same on line 132.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {

Review comment:
       The previous implementation would allow events that managed to be submitted after the sentinel, but before inputs resulted in a throw to be processed. I think we want a `continue` on `event == STOP_EVENT` rather than `break`.
   
   It's not clear that this will make a difference in practice. Continuing to poll after STOP_EVENT doesn't guarantee we record all such events, only if they're already visible in the queue.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void dispatch(final LogEvent event) {
+
+        // Dispatch the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable error) {
+                // If no appender is successful, the error appender will get it.
+                // It is okay to simply log it here.
+                if (LOGGER.isTraceEnabled()) {
+                    final String message = String.format(
+                            "%s has failed to call appender %s",
+                            getName(), control.getAppenderName());
+                    LOGGER.trace(message, error);

Review comment:
       I believe as long as there are fewer curly braces than args, and the last arg is a Throwable, it works. (worth a quick validation running locally though).

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {

Review comment:
       > Shouldn't we simply discard anything that is sent after the stop() signal?
   
   I think this depends how we define "after" and the "stop signal" ;-)
   
   Consider:
   
   Thread T1 calls asyncAppender.append with event E
   T1 checks if the appender is running, this succeeds
   Thread T2 calls asyncAppender.stop()
   T2 sets stopped = true
   T2 submits the STOP_EVENT
   Thread T1 submits event E to the queue
   
   An event is submitted prior to stop, but the STOP_EVENT is received first.
   
   Ideally if the event is ignored, we would provide a status message describing that the event was not recorded because the appender was stopped -- events shouldn't disappear, and ideally we don't leave objects (with references to potentially large user-provided parameters) sitting in the queue on heap.
   That said, continuing to read from the queue after STOP_EVENT doesn't guarantee that we solve the problem (offer may not have been invoked yet!), only makes it less likely that we encounter the race.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {

Review comment:
       I defer to your judgement whether or not to make a change here, this was meant to elaborate upon the comment and complexities of the scenario. We can make changes separately from this PR even if we decide the behavior should be different :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554000037



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       @carterkozak, I guess we are sort of done, right? This `while` block discussion is the only one left there. I don't have a strong opinion here. How shall we proceed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553433626



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forward(final LogEvent event) {

Review comment:
       naming opinions/bikeshedding, feel free to ignore: Since this is called externally when the queue is full in some circumstances, perhaps the behavior would be more obvious if it was named `callAppenders` or even `callAppendersOnCurrentThread`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554012652



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void dispatch(final LogEvent event) {
+
+        // Dispatch the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable error) {
+                // If no appender is successful, the error appender will get it.
+                // It is okay to simply log it here.
+                if (LOGGER.isTraceEnabled()) {
+                    final String message = String.format(
+                            "%s has failed to call appender %s",
+                            getName(), control.getAppenderName());
+                    LOGGER.trace(message, error);

Review comment:
       I believe as long as there are fewer curly braces than args, and the last arg is a Throwable, it works. (worth a quick validation running locally though).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak edited a comment on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak edited a comment on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-756187667


   One point of clarification, I don't think the `Log4jLogEvent` change results in any different behavior, each event (with the exception of `SHUTDOWN_LOG_EVENT`) is transformed to a `Log4jLogEvent` in the `AsyncAppender.append` method here:
   https://github.com/apache/logging-log4j2/blob/d96869bc3b1225f1b080ce44e6a27c11f9d6828b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java#L162
   
   I still need to review the shutdown mechanism for thread safety, although at a glance the queue empty checks that exist on release-2.x are entirely threadsafe either...
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553412249



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       I wonder if we could add a thread state check around join instead? If the thread is still initializing, we've already set `stopped` to `true`, so it should exit almost immediately if I understand correctly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553412249



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       I wonder if we could add a thread state check around `join(timeoutMillis);` instead? If the thread is still initializing, we've already set `stopped` to `true`, so it should exit almost immediately if I understand correctly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] garydgregory commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
garydgregory commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553346306



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);

Review comment:
       Less clutter with a for each loop IMO.

##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       I would rename to "forward" since you give the method one argument always and it's not like we have to distinguish with a forwardTwo method ;-) or am I missing something? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-756187667


   One point of clarification, I don't think the `Log4jLogEvent` change results in any different behavior, each event (with the exception of `SHUTDOWN_LOG_EVENT`) is transformed to a `Log4jLogEvent` in the `AsyncAppender.append` method [here](https://github.com/apache/logging-log4j2/pull/452/files#diff-6b30d73942796a35c3bcf93d46127d03cb6959fb69fb8eb84c4c6fca13bb68e6R155)
   
   I still need to review the shutdown mechanism for thread safety, although at a glance the queue empty checks that exist on release-2.x are entirely threadsafe either...
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-752246841


   > The major improvement this change set brings is that there are no smart whitelist of failures that are subject to handling.
   
   The downside is when an error is raised, we pay the full cost of thread creation and context switching rather than catching and continuing.
   
   Can we compare this solution with a potential alternative:
   The `AppenderControl` is reverted to catch `Exception`, not `Throwable`/`Error`, however `AsyncAppender.AsyncThread.run()` is updated to catch `Throwable` in both `callAppenders` and the `errorAppender.callAppender` try/catch? I believe this would match how the disruptor-based `AsyncContextSelector` implementation works already.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553999256



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forward(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forward(final LogEvent event) {

Review comment:
       I will kindly pass this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-752739475


   > I hope I'm not coming off too strong here
   
   Not at all! Somebody is thinking along with me free of charge! What else I can ask for? I am really grateful that you take time for this. :bow:
   
   > In my proposal above, the AsyncAppender would catch it and warn/ignore as it catches all throwables, but the AppenderControl would allow ThreadDeath to propagate, bringing back 2.13.x behavior. The difference is that the AppenderControl can throw errors again, but background threads must be robust against them.
   
   But once `AsyncAppender` catches the `ThreadDeath`, it is basically dead. Who is gonna restart that thread?
   
   > This PR makes the AsyncAppender robust against Error by transitioning to another thread, but I'm not sure why we need to change threads instead of allowing the original thread to continue.
   
   Because the original thread is dead as indicated by `ThreadDeath`?
   
   > > In summary, I am really reluctant to add any kind of `catch (Throwable ...)` clauses.
   > 
   > I think the reason this strikes me as odd is that this implementation is already using a `catch (Throwable ...)`, but without writing it directly. We're using an uncaught exception handler to catch `Throwable` and handle it by logging to the statuslogger, and start another thread. If we explicitly write the try/catch instead, we can produce identical behavior without creating new thread
   
   Indeed the code is still possessing a `catch (Throwable ...)` behavior, but we delegate handling of non-`Exception` throwables to the JDK, rather than putting a plaster to every new hole we discover. 
   
   > The `catch Throwable` occurs outside log4j2, in the disruptor implementation:
   > 
   > https://github.com/LMAX-Exchange/disruptor/blob/c1ea0d4a6d6144d338a9ff8cb0d46f2a7038b968/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L185-L190
   
   Really ~weird~ interesting... I still think `catch (Throwable ...)` is a time bomb waiting to be exploited by a bug report. You are handling `OutOfMemoryError`s, `ThreadDeath`s, etc. I will try poking somebody from Disruptor to comment on this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553405665



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       For the records, with `forwardOne()`, I was trying to stick to the Spring Data conventions: `findOne()`, `findAll()`, etc. Further, I thought it cognitively rhymes better in the code: `forwardAll()`, `forwardRemaining()`, and `forwardOne()`. @garydgregory, do you still want me to rename it to `forward()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy merged pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy merged pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553403880



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       Yes. If `stop()` is called right after `start()`, there is a slight chance that `stop()` will proceed even though the thread is not started yet. Consequently interrupt+join blocks. Without this line, if you would try manually running `AsyncAppenderExceptionHandlingTest`, it fails randomly, though rarely.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554435606



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {

Review comment:
       I think your argument makes sense. Replace `break` with `continue`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553432863



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));
+
+        // Interrupt the thread. (Note that there is neither a check on
+        // "stopped", nor a synchronization here. It is okay to interrupt
+        // concurrently and/or multiple times.)
+        interrupt();

Review comment:
       Makes sense. Will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy merged pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy merged pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553368134



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       Is this still necessary?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553434123



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       `dispatcher` or `handler`. No strong opinion, I'm terrible at naming.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553422956



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       Renamed to `forward()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-752268188


   > Then the hell broke loose.
   
   I don't think it's that bad! We fixed a problem for some users, and caused an issue for others. I'd consider the behavior in 2.14 an improvement over previous releases, but I agree that we should allow ThreadDeath to propagate in one way or another.
   
   > How are we gonna handle ThreadDeath in this case?
   
   In my proposal above, the AsyncAppender would catch it and warn/ignore as it catches all throwables, but the AppenderControl would allow ThreadDeath to propagate, bringing back 2.13.x behavior. The difference is that the AppenderControl can throw errors again, but background threads must be robust against them. This PR makes the AsyncAppender robust against Error by transitioning to another thread, but I'm not sure why we need to change threads instead of allowing the original thread to continue.
   
   > In summary, I am really reluctant to add any kind of `catch (Throwable ...)` clauses.
   
   I think the reason this strikes me as odd is that this implementation is already using a `catch (Throwable ...)`, but without writing it directly. We're using an uncaught exception handler to catch `Throwable` and handle it by logging to the statuslogger, and start another thread. If we explicitly write the try/catch instead, we can produce identical behavior without creating new thread
   
   > Any code pointers?
   
   The `catch Throwable` occurs outside log4j2, in the disruptor implementation:
   
   https://github.com/LMAX-Exchange/disruptor/blob/c1ea0d4a6d6144d338a9ff8cb0d46f2a7038b968/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L185-L190
   
   I hope I'm not coming off too strong here, I really appreciate the work you've put into fixing this bug!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553366996



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       Oops, I misread your comment. I think it's odd that this object is both a thread, and called a forwarder. I think that made sense before, but the background thread is a receiver, and this provides functionality to submit events. Naming things is one of the two hardest problems in CS, and I don't have better alternatives in mind.
   
   To be clear I'm not suggesting it must be renamed, just thinking aloud.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554019016



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {
+            if (!stopped) {
+                stopped = true;
+                LOGGER.trace("{} is signaled to stop.", getName());
+            }
+        }
+
+        // There is a slight chance that the thread is not started yet, wait for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));

Review comment:
       Let's get all done here. I really want to get this merged and poke Ralph for 2.14.1.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553404772



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {

Review comment:
       @garydgregory, okay, I will rename it to `forward()`.
   
   @carterkozak, what about *dispatcher*?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#issuecomment-752151311


   I think the only way this can happen is if someone calls [Thread.stop()](https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#stop--), so we must consider the intent when that occurs. Is it accidental, where the asyncappender interaction is collateral damage, or is the system attempting to stop all threads? In the former case we may want to avoid breaking the logger, but in the latter we would not want to restart the background thread.
   
   If we're restarting the background thread, why not simply catch ThreadDeath to avoid thread churn and potential bugs that result from the indirection?
   
   Based on the original request when we began catching Error, I don't think the caller was stopping background threads (or using asynchronous logging at all) but rather calling Thread.stop on an application thread that happened to log. Using synchronous logging it would swallow the ThreadDeath error silently and allow the application thread to continue. I think we can allow ThreadDeath to be propagated, and even kill background threads -- if that's what the user wants and the security manager allows it, on their own heads be it!
   
   What do you think?
   
   Also note that as written this PR only applies to the AsyncAppender, not the disruptor based fully async AsyncLoggerContextSelector or mixed sync/async logging.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553421839



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable ignored) {
+                // If no appender is successful, the error appender will get it.
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable ignored) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        synchronized (this) {

Review comment:
       Doh! :facepalm: A relic from the refactoring. Fixed by replacing with an `AtomicBoolean`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554001855



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void dispatch(final LogEvent event) {
+
+        // Dispatch the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable error) {
+                // If no appender is successful, the error appender will get it.
+                // It is okay to simply log it here.
+                if (LOGGER.isTraceEnabled()) {
+                    final String message = String.format(
+                            "%s has failed to call appender %s",
+                            getName(), control.getAppenderName());
+                    LOGGER.trace(message, error);

Review comment:
       nit: I think we can use interpolation curly braces instead of String.format. Same on line 132.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r553364736



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventForwarder.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventForwarder extends Log4jThread {
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private volatile boolean stopped;
+
+    AsyncAppenderEventForwarder(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventForwarder-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stopped = false;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        forwardAll();
+        forwardRemaining();
+    }
+
+    private void forwardAll() {
+        while (!stopped) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void forwardRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            forwardOne(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void forwardOne(final LogEvent event) {
+
+        // Forward the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);

Review comment:
       I suppose we could convert the list to an array on creation and get the best of both worlds :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554036911



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {

Review comment:
       > Shouldn't we simply discard anything that is sent after the stop() signal?
   
   I think this depends how we define "after" and the "stop signal" ;-)
   
   Consider:
   
   Thread T1 calls asyncAppender.append with event E
   T1 checks if the appender is running, this succeeds
   Thread T2 calls asyncAppender.stop()
   T2 sets stopped = true
   T2 submits the STOP_EVENT
   Thread T1 submits event E to the queue
   
   An event is submitted prior to stop, but the STOP_EVENT is received first.
   
   Ideally if the event is ignored, we would provide a status message describing that the event was not recorded because the appender was stopped -- events shouldn't disappear, and ideally we don't leave objects (with references to potentially large user-provided parameters) sitting in the queue on heap.
   That said, continuing to read from the queue after STOP_EVENT doesn't guarantee that we solve the problem (offer may not have been invoked yet!), only makes it less likely that we encounter the race.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] vy commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
vy commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554017030



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    void dispatch(final LogEvent event) {
+
+        // Dispatch the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable error) {
+                // If no appender is successful, the error appender will get it.
+                // It is okay to simply log it here.
+                if (LOGGER.isTraceEnabled()) {
+                    final String message = String.format(
+                            "%s has failed to call appender %s",
+                            getName(), control.getAppenderName());
+                    LOGGER.trace(message, error);

Review comment:
       Didn't know such a thing existed at all! :open_mouth: Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [logging-log4j2] carterkozak commented on a change in pull request #452: LOG4J2-2972 Respawn AsyncAppender thread on failures.

Posted by GitBox <gi...@apache.org>.
carterkozak commented on a change in pull request #452:
URL: https://github.com/apache/logging-log4j2/pull/452#discussion_r554009348



##########
File path: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null || event == STOP_EVENT) {

Review comment:
       The previous implementation would allow events that managed to be submitted after the sentinel, but before inputs resulted in a throw to be processed. I think we want a `continue` on `event == STOP_EVENT` rather than `break`.
   
   It's not clear that this will make a difference in practice. Continuing to poll after STOP_EVENT doesn't guarantee we record all such events, only if they're already visible in the queue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org