You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rp...@apache.org on 2016/09/08 14:50:35 UTC

[06/50] [abbrv] logging-log4j2 git commit: Propagate and use timeout values from Configurator.shutdown(LoggerContext, long, TimeUnit).

Propagate and use timeout values from
Configurator.shutdown(LoggerContext,long,TimeUnit).

Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/17046951
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/17046951
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/17046951

Branch: refs/heads/LOG4J2-1349-gcfree-threadcontext
Commit: 170469514b374eb5a5a33bde6936162fd608f097
Parents: 56a688f
Author: Gary Gregory <gg...@apache.org>
Authored: Tue Sep 6 05:22:19 2016 -0400
Committer: Gary Gregory <gg...@apache.org>
Committed: Tue Sep 6 05:22:19 2016 -0400

----------------------------------------------------------------------
 .../logging/log4j/core/AbstractLifeCycle.java   |   2 +-
 .../log4j/core/appender/AbstractManager.java    |  13 +-
 .../appender/AbstractOutputStreamAppender.java  |   2 +-
 .../core/appender/AbstractWriterAppender.java   |   2 +-
 .../core/appender/OutputStreamManager.java      |   3 +-
 .../log4j/core/appender/WriterManager.java      |   3 +-
 .../appender/db/AbstractDatabaseAppender.java   |   2 +-
 .../appender/db/AbstractDatabaseManager.java    |   3 +-
 .../log4j/core/appender/mom/JmsAppender.java    |   2 +-
 .../log4j/core/appender/mom/JmsManager.java     |   6 +-
 .../appender/mom/jeromq/JeroMqAppender.java     |   2 +-
 .../core/appender/mom/jeromq/JeroMqManager.java |   3 +-
 .../core/appender/mom/kafka/KafkaAppender.java  |   2 +-
 .../core/appender/mom/kafka/KafkaManager.java   |  17 +-
 .../log4j/core/filter/AbstractFilterable.java   |  12 +-
 .../logging/log4j/core/net/JndiManager.java     |   3 +-
 .../log4j/core/net/server/JmsServer.java        |   2 +-
 .../db/AbstractDatabaseAppenderTest.java        |   4 +-
 .../db/AbstractDatabaseManagerTest.java         |   2 +-
 .../log4j/flume/appender/FlumeAppender.java     |   2 +-
 .../log4j/flume/appender/FlumeAvroManager.java  | 666 +++++++++----------
 .../flume/appender/FlumeEmbeddedManager.java    |   3 +-
 .../flume/appender/FlumePersistentManager.java  |  16 +-
 23 files changed, 386 insertions(+), 386 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/AbstractLifeCycle.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/AbstractLifeCycle.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/AbstractLifeCycle.java
index 3fb15bf..33aabd1 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/AbstractLifeCycle.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/AbstractLifeCycle.java
@@ -28,7 +28,7 @@ import org.apache.logging.log4j.status.StatusLogger;
  */
 public class AbstractLifeCycle implements LifeCycle {
 
-    public static final int DEFAULT_STOP_TIMEOUT = -1;
+    public static final int DEFAULT_STOP_TIMEOUT = 0;
     public static final TimeUnit DEFAULT_STOP_TIMEUNIT = TimeUnit.MILLISECONDS;
 
     /**

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractManager.java
index 0a036a0..dfe7ffd 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractManager.java
@@ -18,11 +18,13 @@ package org.apache.logging.log4j.core.appender;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.AbstractLifeCycle;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.message.Message;
 import org.apache.logging.log4j.status.StatusLogger;
@@ -69,20 +71,23 @@ public abstract class AbstractManager implements AutoCloseable {
      */
     @Override
     public void close() {
+        stop(AbstractLifeCycle.DEFAULT_STOP_TIMEOUT, AbstractLifeCycle.DEFAULT_STOP_TIMEUNIT);
+    }
+
+    public void stop(final long timeout, final TimeUnit timeUnit) {
         LOCK.lock();
         try {
             --count;
             if (count <= 0) {
                 MAP.remove(name);
                 LOGGER.debug("Shutting down {} {}", this.getClass().getSimpleName(), getName());
-                releaseSub();
+                releaseSub(timeout, timeUnit);
             }
         } finally {
             LOCK.unlock();
         }
     }
 
-
     /**
      * Retrieves a Manager if it has been previously created or creates a new Manager.
      * @param name The name of the Manager to retrieve.
@@ -136,8 +141,10 @@ public abstract class AbstractManager implements AutoCloseable {
     /**
      * May be overridden by Managers to perform processing while the Manager is being released and the
      * lock is held.
+     * @param timeout TODO
+     * @param timeUnit TODO
      */
-    protected void releaseSub() {
+    protected void releaseSub(long timeout, TimeUnit timeUnit) {
         // This default implementation does nothing.
     }
 

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractOutputStreamAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractOutputStreamAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractOutputStreamAppender.java
index 98030fc..4d16be8 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractOutputStreamAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractOutputStreamAppender.java
@@ -139,7 +139,7 @@ public abstract class AbstractOutputStreamAppender<M extends OutputStreamManager
     @Override
     protected boolean stop(final long timeout, final TimeUnit timeUnit, final boolean changeLifeCycleState) {
         super.stop(timeout, timeUnit, changeLifeCycleState);
-        manager.close();
+        manager.stop(timeout, timeUnit);
         if (changeLifeCycleState) {
             setStopped();
         }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractWriterAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractWriterAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractWriterAppender.java
index 8b85588..7e1d46b 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractWriterAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractWriterAppender.java
@@ -119,7 +119,7 @@ public abstract class AbstractWriterAppender<M extends WriterManager> extends Ab
     public boolean stop(final long timeout, final TimeUnit timeUnit) {
         setStopping();
         super.stop(timeout, timeUnit, false);
-        manager.close();
+        manager.stop(timeout, timeUnit);
         setStopped();
         return true;
     }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/OutputStreamManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/OutputStreamManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/OutputStreamManager.java
index 81d5027..f42cc72 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/OutputStreamManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/OutputStreamManager.java
@@ -21,6 +21,7 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.logging.log4j.core.Layout;
 import org.apache.logging.log4j.core.LoggerContext;
@@ -130,7 +131,7 @@ public class OutputStreamManager extends AbstractManager implements ByteBufferDe
      * Default hook to write footer during close.
      */
     @Override
-    public void releaseSub() {
+    public void releaseSub(long timeout, TimeUnit timeUnit) {
         writeFooter();
         closeOutputStream();
     }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/WriterManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/WriterManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/WriterManager.java
index cee99e8..5a95794 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/WriterManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/WriterManager.java
@@ -18,6 +18,7 @@ package org.apache.logging.log4j.core.appender;
 
 import java.io.IOException;
 import java.io.Writer;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.logging.log4j.core.StringLayout;
 
@@ -98,7 +99,7 @@ public class WriterManager extends AbstractManager {
      * Default hook to write footer during close.
      */
     @Override
-    public void releaseSub() {
+    public void releaseSub(long timeout, TimeUnit timeUnit) {
         writeFooter();
         closeWriter();
     }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppender.java
index 803e0cc..85a3a67 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppender.java
@@ -95,7 +95,7 @@ public abstract class AbstractDatabaseAppender<T extends AbstractDatabaseManager
         setStopping();
         super.stop(timeout, timeUnit, false);
         if (this.getManager() != null) {
-            this.getManager().close();
+            this.getManager().stop(timeout, timeUnit);
         }
         setStopped();
         return true;

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java
index 4457008..26f0070 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java
@@ -19,6 +19,7 @@ package org.apache.logging.log4j.core.appender.db;
 
 import java.io.Flushable;
 import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.logging.log4j.core.LogEvent;
 import org.apache.logging.log4j.core.appender.AbstractManager;
@@ -170,7 +171,7 @@ public abstract class AbstractDatabaseManager extends AbstractManager implements
     }
 
     @Override
-    public final void releaseSub() {
+    public final void releaseSub(long timeout, TimeUnit timeUnit) {
         this.shutdown();
     }
 

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsAppender.java
index 442243b..8e21de8 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsAppender.java
@@ -74,7 +74,7 @@ public class JmsAppender extends AbstractAppender {
     public boolean stop(final long timeout, final TimeUnit timeUnit) {
         setStopping();
         super.stop(timeout, timeUnit, false);
-        this.manager.close();
+        this.manager.stop(timeout, timeUnit);
         setStopped();
         return true;
     }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsManager.java
index ff04c5a..2d8dfda 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsManager.java
@@ -18,6 +18,8 @@
 package org.apache.logging.log4j.core.appender.mom;
 
 import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -123,7 +125,7 @@ public class JmsManager extends AbstractManager {
     }
 
     @Override
-    protected void releaseSub() {
+    protected void releaseSub(long timeout, TimeUnit timeUnit) {
         try {
             this.session.close();
         } catch (final JMSException ignored) {
@@ -134,7 +136,7 @@ public class JmsManager extends AbstractManager {
         } catch (final JMSException ignored) {
             // ignore
         }
-        this.jndiManager.close();
+        this.jndiManager.stop(timeout, timeUnit);
     }
 
     private static class JmsConfiguration {

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
index 0f96a21..5a60653 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
@@ -153,7 +153,7 @@ public final class JeroMqAppender extends AbstractAppender {
     public boolean stop(final long timeout, final TimeUnit timeUnit) {
         setStopping();
         super.stop(timeout, timeUnit, false);
-        manager.close();
+        manager.stop(timeout, timeUnit);
         setStopped();
         return true;
     }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
index 6f106be..c460d93 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
@@ -19,6 +19,7 @@ package org.apache.logging.log4j.core.appender.mom.jeromq;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.core.appender.AbstractManager;
@@ -104,7 +105,7 @@ public class JeroMqManager extends AbstractManager {
     }
 
     @Override
-    protected void releaseSub() {
+    protected void releaseSub(long timeout, TimeUnit timeUnit) {
         publisher.close();
     }
 

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
index 6f8e872..bee9437 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -104,7 +104,7 @@ public final class KafkaAppender extends AbstractAppender {
     public boolean stop(final long timeout, final TimeUnit timeUnit) {
         setStopping();
         super.stop(timeout, timeUnit, false);
-        manager.close();
+        manager.stop(timeout, timeUnit);
         setStopped();
         return true;
     }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index 35aafb7..58c4ba2 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -56,22 +56,9 @@ public class KafkaManager extends AbstractManager {
     }
 
     @Override
-    public void releaseSub() {
+    public void releaseSub(final long timeout, final TimeUnit timeUnit) {
         if (producer != null) {
-            // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
-            final Runnable task = new Runnable() {
-                @Override
-                public void run() {
-                    if (producer != null) {
-                        producer.close();
-                    }
-                }
-            };
-            try {
-                getLoggerContext().submitDaemon(task).get(timeoutMillis, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException | ExecutionException | TimeoutException e) {
-                // ignore
-            }
+            producer.close(timeout, timeUnit);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/filter/AbstractFilterable.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/filter/AbstractFilterable.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/filter/AbstractFilterable.java
index 81b8ea9..92b8eee 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/filter/AbstractFilterable.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/filter/AbstractFilterable.java
@@ -146,12 +146,7 @@ public abstract class AbstractFilterable extends AbstractLifeCycle implements Fi
      */
     @Override
     public boolean stop(final long timeout, final TimeUnit timeUnit) {
-        this.setStopping();
-        if (filter != null) {
-            filter.stop(timeout, timeUnit);
-        }
-        this.setStopped();
-        return true;
+        return stop(timeout, timeUnit, true);
     }
 
     /**
@@ -161,13 +156,14 @@ public abstract class AbstractFilterable extends AbstractLifeCycle implements Fi
         if (changeLifeCycleState) {
             this.setStopping();
         }
+        boolean stopped = true;
         if (filter != null) {
-            filter.stop(timeout, timeUnit);
+            stopped = filter.stop(timeout, timeUnit);
         }
         if (changeLifeCycleState) {
             this.setStopped();
         }
-        return true;
+        return stopped;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/net/JndiManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/JndiManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/JndiManager.java
index c413373..91f9bcb 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/JndiManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/JndiManager.java
@@ -18,6 +18,7 @@
 package org.apache.logging.log4j.core.net;
 
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import javax.naming.Context;
 import javax.naming.InitialContext;
@@ -111,7 +112,7 @@ public class JndiManager extends AbstractManager {
     }
 
     @Override
-    protected void releaseSub() {
+    protected void releaseSub(long timeout, TimeUnit timeUnit) {
         JndiCloser.closeSilently(this.context);
     }
 

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/net/server/JmsServer.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/server/JmsServer.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/server/JmsServer.java
index 887bcc5..15e0a3f 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/server/JmsServer.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/server/JmsServer.java
@@ -110,7 +110,7 @@ public class JmsServer extends LogEventListener implements MessageListener, Life
         } catch (final JMSException e) {
             LOGGER.debug("Exception closing {}", messageConsumer, e);
         }
-        jmsManager.close();
+        jmsManager.stop(timeout, timeUnit);
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppenderTest.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppenderTest.java
index d7f955d..738b2d5 100644
--- a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppenderTest.java
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppenderTest.java
@@ -79,7 +79,7 @@ public class AbstractDatabaseAppenderTest {
 
         verify(this.manager, this.appender);
         reset(this.manager, this.appender);
-        this.manager.close();
+        this.manager.shutdownInternal();
         expectLastCall();
         replay(this.manager, this.appender);
 
@@ -111,7 +111,7 @@ public class AbstractDatabaseAppenderTest {
 
         verify(this.manager, this.appender, newManager);
         reset(this.manager, this.appender, newManager);
-        newManager.close();
+        newManager.shutdownInternal();
         expectLastCall();
         replay(this.manager, this.appender, newManager);
 

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManagerTest.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManagerTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManagerTest.java
index e462b31..36c89d2 100644
--- a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManagerTest.java
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManagerTest.java
@@ -92,7 +92,7 @@ public class AbstractDatabaseManagerTest {
         expectLastCall();
         replay(this.manager);
 
-        this.manager.releaseSub();
+        this.manager.releaseSub(-1, null);
         assertFalse("The manager should not be running anymore.", this.manager.isRunning());
     }
 

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
index 3e2cf10..bdad7e4 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
@@ -110,7 +110,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF
     public boolean stop(final long timeout, final TimeUnit timeUnit) {
         setStopping();
         super.stop(timeout, timeUnit, false);
-        manager.close();
+        manager.stop(timeout, timeUnit);
         setStopped();
         return true;
     }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
index dd5dd96..61b2392 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
@@ -1,333 +1,333 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-package org.apache.logging.log4j.flume.appender;
-
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flume.Event;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.logging.log4j.core.appender.AppenderLoggingException;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-
-/**
- * Manager for FlumeAvroAppenders.
- */
-public class FlumeAvroManager extends AbstractFlumeManager {
-
-    private static final int MAX_RECONNECTS = 3;
-    private static final int MINIMUM_TIMEOUT = 1000;
-
-    private static AvroManagerFactory factory = new AvroManagerFactory();
-
-    private final Agent[] agents;
-
-    private final int batchSize;
-
-    private final long delayNanos;
-    private final int delayMillis;
-
-    private final int retries;
-
-    private final int connectTimeoutMillis;
-
-    private final int requestTimeoutMillis;
-
-    private final int current = 0;
-
-    private RpcClient rpcClient = null;
-
-    private BatchEvent batchEvent = new BatchEvent();
-    private long nextSend = 0;
-
-    /**
-     * Constructor
-     * @param name The unique name of this manager.
-     * @param agents An array of Agents.
-     * @param batchSize The number of events to include in a batch.
-     * @param retries The number of times to retry connecting before giving up.
-     * @param connectTimeout The connection timeout in ms.
-     * @param requestTimeout The request timeout in ms.
-     *
-     */
-    protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
-                               final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
-        super(name);
-        this.agents = agents;
-        this.batchSize = batchSize;
-        this.delayMillis = delayMillis;
-        this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
-        this.retries = retries;
-        this.connectTimeoutMillis = connectTimeout;
-        this.requestTimeoutMillis = requestTimeout;
-        this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
-    }
-
-    /**
-     * Returns a FlumeAvroManager.
-     * @param name The name of the manager.
-     * @param agents The agents to use.
-     * @param batchSize The number of events to include in a batch.
-     * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
-     * @param retries The number of times to retry connecting before giving up.
-     * @param connectTimeoutMillis The connection timeout in ms.
-     * @param requestTimeoutMillis The request timeout in ms.
-     * @return A FlumeAvroManager.
-     */
-    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
-                                              final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
-        if (agents == null || agents.length == 0) {
-            throw new IllegalArgumentException("At least one agent is required");
-        }
-
-        if (batchSize <= 0) {
-            batchSize = 1;
-        }
-
-        final StringBuilder sb = new StringBuilder("FlumeAvro[");
-        boolean first = true;
-        for (final Agent agent : agents) {
-            if (!first) {
-                sb.append(',');
-            }
-            sb.append(agent.getHost()).append(':').append(agent.getPort());
-            first = false;
-        }
-        sb.append(']');
-        return getManager(sb.toString(), factory,
-                new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
-    }
-
-    /**
-     * Returns the agents.
-     * @return The agent array.
-     */
-    public Agent[] getAgents() {
-        return agents;
-    }
-
-    /**
-     * Returns the index of the current agent.
-     * @return The index for the current agent.
-     */
-    public int getCurrent() {
-        return current;
-    }
-
-    public int getRetries() {
-        return retries;
-    }
-
-    public int getConnectTimeoutMillis() {
-        return connectTimeoutMillis;
-    }
-
-    public int getRequestTimeoutMillis() {
-        return requestTimeoutMillis;
-    }
-
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    public int getDelayMillis() {
-        return delayMillis;
-    }
-
-    public synchronized void send(final BatchEvent events) {
-        if (rpcClient == null) {
-            rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
-        }
-
-        if (rpcClient != null) {
-            try {
-                LOGGER.trace("Sending batch of {} events", events.getEvents().size());
-                rpcClient.appendBatch(events.getEvents());
-            } catch (final Exception ex) {
-                rpcClient.close();
-                rpcClient = null;
-                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
-                    agents[current].getPort();
-                LOGGER.warn(msg, ex);
-                throw new AppenderLoggingException("No Flume agents are available");
-            }
-        }  else {
-            final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
-                agents[current].getPort();
-            LOGGER.warn(msg);
-            throw new AppenderLoggingException("No Flume agents are available");
-        }
-    }
-
-    @Override
-    public synchronized void send(final Event event)  {
-        if (batchSize == 1) {
-            if (rpcClient == null) {
-                rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
-            }
-
-            if (rpcClient != null) {
-                try {
-                    rpcClient.append(event);
-                } catch (final Exception ex) {
-                    rpcClient.close();
-                    rpcClient = null;
-                    final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
-                            agents[current].getPort();
-                    LOGGER.warn(msg, ex);
-                    throw new AppenderLoggingException("No Flume agents are available");
-                }
-            } else {
-                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
-                        agents[current].getPort();
-                LOGGER.warn(msg);
-                throw new AppenderLoggingException("No Flume agents are available");
-            }
-        } else {
-            batchEvent.addEvent(event);
-            final int eventCount = batchEvent.getEvents().size();
-            if (eventCount == 1) {
-                nextSend = System.nanoTime() + delayNanos;
-            }
-            if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
-                send(batchEvent);
-                batchEvent = new BatchEvent();
-            }
-        }
-    }
-
-    /**
-     * There is a very good chance that this will always return the first agent even if it isn't available.
-     * @param agents The list of agents to choose from
-     * @return The FlumeEventAvroServer.
-     */
-    private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
-        try {
-            final Properties props = new Properties();
-
-            props.put("client.type", "default_failover");
-
-            int agentCount = 1;
-            final StringBuilder sb = new StringBuilder();
-            for (final Agent agent : agents) {
-                if (sb.length() > 0) {
-                    sb.append(' ');
-                }
-                final String hostName = "host" + agentCount++;
-                props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
-                sb.append(hostName);
-            }
-            props.put("hosts", sb.toString());
-            if (batchSize > 0) {
-                props.put("batch-size", Integer.toString(batchSize));
-            }
-            if (retries > 1) {
-                if (retries > MAX_RECONNECTS) {
-                    retries = MAX_RECONNECTS;
-                }
-                props.put("max-attempts", Integer.toString(retries * agents.length));
-            }
-            if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
-                props.put("request-timeout", Integer.toString(requestTimeoutMillis));
-            }
-            if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
-                props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
-            }
-            return RpcClientFactory.getInstance(props);
-        } catch (final Exception ex) {
-            LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
-            return null;
-        }
-    }
-
-    @Override
-    protected void releaseSub() {
-        if (rpcClient != null) {
-            try {
-                synchronized(this) {
-                    try {
-                        if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
-                            send(batchEvent);
-                        }
-                    } catch (final Exception ex) {
-                        LOGGER.error("Error sending final batch: {}", ex.getMessage());
-                    }
-                }
-                rpcClient.close();
-            } catch (final Exception ex) {
-                LOGGER.error("Attempt to close RPC client failed", ex);
-            }
-        }
-        rpcClient = null;
-    }
-
-    /**
-     * Factory data.
-     */
-    private static class FactoryData {
-        private final String name;
-        private final Agent[] agents;
-        private final int batchSize;
-        private final int delayMillis;
-        private final int retries;
-        private final int conntectTimeoutMillis;
-        private final int requestTimeoutMillis;
-
-        /**
-         * Constructor.
-         * @param name The name of the Appender.
-         * @param agents The agents.
-         * @param batchSize The number of events to include in a batch.
-         */
-        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
-                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
-            this.name = name;
-            this.agents = agents;
-            this.batchSize = batchSize;
-            this.delayMillis = delayMillis;
-            this.retries = retries;
-            this.conntectTimeoutMillis = connectTimeoutMillis;
-            this.requestTimeoutMillis = requestTimeoutMillis;
-        }
-    }
-
-    /**
-     * Avro Manager Factory.
-     */
-    private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
-
-        /**
-         * Create the FlumeAvroManager.
-         * @param name The name of the entity to manage.
-         * @param data The data required to create the entity.
-         * @return The FlumeAvroManager.
-         */
-        @Override
-        public FlumeAvroManager createManager(final String name, final FactoryData data) {
-            try {
-
-                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
-                        data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
-            } catch (final Exception ex) {
-                LOGGER.error("Could not create FlumeAvroManager", ex);
-            }
-            return null;
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Event;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.logging.log4j.core.appender.AppenderLoggingException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+
+/**
+ * Manager for FlumeAvroAppenders.
+ */
+public class FlumeAvroManager extends AbstractFlumeManager {
+
+    private static final int MAX_RECONNECTS = 3;
+    private static final int MINIMUM_TIMEOUT = 1000;
+
+    private static AvroManagerFactory factory = new AvroManagerFactory();
+
+    private final Agent[] agents;
+
+    private final int batchSize;
+
+    private final long delayNanos;
+    private final int delayMillis;
+
+    private final int retries;
+
+    private final int connectTimeoutMillis;
+
+    private final int requestTimeoutMillis;
+
+    private final int current = 0;
+
+    private RpcClient rpcClient = null;
+
+    private BatchEvent batchEvent = new BatchEvent();
+    private long nextSend = 0;
+
+    /**
+     * Constructor
+     * @param name The unique name of this manager.
+     * @param agents An array of Agents.
+     * @param batchSize The number of events to include in a batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectTimeout The connection timeout in ms.
+     * @param requestTimeout The request timeout in ms.
+     *
+     */
+    protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
+                               final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
+        super(name);
+        this.agents = agents;
+        this.batchSize = batchSize;
+        this.delayMillis = delayMillis;
+        this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
+        this.retries = retries;
+        this.connectTimeoutMillis = connectTimeout;
+        this.requestTimeoutMillis = requestTimeout;
+        this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
+    }
+
+    /**
+     * Returns a FlumeAvroManager.
+     * @param name The name of the manager.
+     * @param agents The agents to use.
+     * @param batchSize The number of events to include in a batch.
+     * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectTimeoutMillis The connection timeout in ms.
+     * @param requestTimeoutMillis The request timeout in ms.
+     * @return A FlumeAvroManager.
+     */
+    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
+                                              final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+        if (agents == null || agents.length == 0) {
+            throw new IllegalArgumentException("At least one agent is required");
+        }
+
+        if (batchSize <= 0) {
+            batchSize = 1;
+        }
+
+        final StringBuilder sb = new StringBuilder("FlumeAvro[");
+        boolean first = true;
+        for (final Agent agent : agents) {
+            if (!first) {
+                sb.append(',');
+            }
+            sb.append(agent.getHost()).append(':').append(agent.getPort());
+            first = false;
+        }
+        sb.append(']');
+        return getManager(sb.toString(), factory,
+                new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
+    }
+
+    /**
+     * Returns the agents.
+     * @return The agent array.
+     */
+    public Agent[] getAgents() {
+        return agents;
+    }
+
+    /**
+     * Returns the index of the current agent.
+     * @return The index for the current agent.
+     */
+    public int getCurrent() {
+        return current;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public int getConnectTimeoutMillis() {
+        return connectTimeoutMillis;
+    }
+
+    public int getRequestTimeoutMillis() {
+        return requestTimeoutMillis;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public int getDelayMillis() {
+        return delayMillis;
+    }
+
+    public synchronized void send(final BatchEvent events) {
+        if (rpcClient == null) {
+            rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
+        }
+
+        if (rpcClient != null) {
+            try {
+                LOGGER.trace("Sending batch of {} events", events.getEvents().size());
+                rpcClient.appendBatch(events.getEvents());
+            } catch (final Exception ex) {
+                rpcClient.close();
+                rpcClient = null;
+                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+                    agents[current].getPort();
+                LOGGER.warn(msg, ex);
+                throw new AppenderLoggingException("No Flume agents are available");
+            }
+        }  else {
+            final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+                agents[current].getPort();
+            LOGGER.warn(msg);
+            throw new AppenderLoggingException("No Flume agents are available");
+        }
+    }
+
+    @Override
+    public synchronized void send(final Event event)  {
+        if (batchSize == 1) {
+            if (rpcClient == null) {
+                rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
+            }
+
+            if (rpcClient != null) {
+                try {
+                    rpcClient.append(event);
+                } catch (final Exception ex) {
+                    rpcClient.close();
+                    rpcClient = null;
+                    final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+                            agents[current].getPort();
+                    LOGGER.warn(msg, ex);
+                    throw new AppenderLoggingException("No Flume agents are available");
+                }
+            } else {
+                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+                        agents[current].getPort();
+                LOGGER.warn(msg);
+                throw new AppenderLoggingException("No Flume agents are available");
+            }
+        } else {
+            batchEvent.addEvent(event);
+            final int eventCount = batchEvent.getEvents().size();
+            if (eventCount == 1) {
+                nextSend = System.nanoTime() + delayNanos;
+            }
+            if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
+                send(batchEvent);
+                batchEvent = new BatchEvent();
+            }
+        }
+    }
+
+    /**
+     * There is a very good chance that this will always return the first agent even if it isn't available.
+     * @param agents The list of agents to choose from
+     * @return The FlumeEventAvroServer.
+     */
+    private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+        try {
+            final Properties props = new Properties();
+
+            props.put("client.type", "default_failover");
+
+            int agentCount = 1;
+            final StringBuilder sb = new StringBuilder();
+            for (final Agent agent : agents) {
+                if (sb.length() > 0) {
+                    sb.append(' ');
+                }
+                final String hostName = "host" + agentCount++;
+                props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
+                sb.append(hostName);
+            }
+            props.put("hosts", sb.toString());
+            if (batchSize > 0) {
+                props.put("batch-size", Integer.toString(batchSize));
+            }
+            if (retries > 1) {
+                if (retries > MAX_RECONNECTS) {
+                    retries = MAX_RECONNECTS;
+                }
+                props.put("max-attempts", Integer.toString(retries * agents.length));
+            }
+            if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
+                props.put("request-timeout", Integer.toString(requestTimeoutMillis));
+            }
+            if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
+                props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
+            }
+            return RpcClientFactory.getInstance(props);
+        } catch (final Exception ex) {
+            LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
+            return null;
+        }
+    }
+
+    @Override
+    protected void releaseSub(final long timeout, final TimeUnit timeUnit) {
+        if (rpcClient != null) {
+            try {
+                synchronized(this) {
+                    try {
+                        if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
+                            send(batchEvent);
+                        }
+                    } catch (final Exception ex) {
+                        LOGGER.error("Error sending final batch: {}", ex.getMessage());
+                    }
+                }
+                rpcClient.close();
+            } catch (final Exception ex) {
+                LOGGER.error("Attempt to close RPC client failed", ex);
+            }
+        }
+        rpcClient = null;
+    }
+
+    /**
+     * Factory data.
+     */
+    private static class FactoryData {
+        private final String name;
+        private final Agent[] agents;
+        private final int batchSize;
+        private final int delayMillis;
+        private final int retries;
+        private final int conntectTimeoutMillis;
+        private final int requestTimeoutMillis;
+
+        /**
+         * Constructor.
+         * @param name The name of the Appender.
+         * @param agents The agents.
+         * @param batchSize The number of events to include in a batch.
+         */
+        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
+                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+            this.name = name;
+            this.agents = agents;
+            this.batchSize = batchSize;
+            this.delayMillis = delayMillis;
+            this.retries = retries;
+            this.conntectTimeoutMillis = connectTimeoutMillis;
+            this.requestTimeoutMillis = requestTimeoutMillis;
+        }
+    }
+
+    /**
+     * Avro Manager Factory.
+     */
+    private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
+
+        /**
+         * Create the FlumeAvroManager.
+         * @param name The name of the entity to manage.
+         * @param data The data required to create the entity.
+         * @return The FlumeAvroManager.
+         */
+        @Override
+        public FlumeAvroManager createManager(final String name, final FactoryData data) {
+            try {
+
+                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
+                        data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
+            } catch (final Exception ex) {
+                LOGGER.error("Could not create FlumeAvroManager", ex);
+            }
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
index a23c2ed..ec5ee19 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
@@ -19,6 +19,7 @@ package org.apache.logging.log4j.flume.appender;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
@@ -119,7 +120,7 @@ public class FlumeEmbeddedManager extends AbstractFlumeManager {
     }
 
     @Override
-    protected void releaseSub() {
+    protected void releaseSub(final long timeout, final TimeUnit timeUnit) {
         agent.stop();
     }
 

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
index 0f6497b..7f21c9b 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
@@ -76,9 +76,9 @@ public class FlumePersistentManager extends FlumeAvroManager {
 
     private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
 
-    private static final int SHUTDOWN_WAIT_SECONDS = 60;
+    private static final long SHUTDOWN_WAIT_MILLIS = 60000;
 
-    private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
+    private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
 
     private static BDBManagerFactory factory = new BDBManagerFactory();
 
@@ -217,15 +217,17 @@ public class FlumePersistentManager extends FlumeAvroManager {
     }
 
     @Override
-    protected void releaseSub() {
+    protected void releaseSub(final long timeout, final TimeUnit timeUnit) {
         LOGGER.debug("Shutting down FlumePersistentManager");
         worker.shutdown();
-        try {
-            worker.join(TimeUnit.SECONDS.toMillis(SHUTDOWN_WAIT_SECONDS));
+        final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
+        final long shutdownWaitMillis = requestedTimeoutMillis < 0 ? SHUTDOWN_WAIT_MILLIS : requestedTimeoutMillis;
+		try {
+            worker.join(shutdownWaitMillis);
         } catch (final InterruptedException ie) {
             // Ignore the exception and shutdown.
         }
-        ExecutorServices.shutdown(threadPool, SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS, toString());
+        ExecutorServices.shutdown(threadPool, shutdownWaitMillis, TimeUnit.MILLISECONDS, toString());
         try {
             worker.join();
         } catch (final InterruptedException ex) {
@@ -243,7 +245,7 @@ public class FlumePersistentManager extends FlumeAvroManager {
         } catch (final Exception ex) {
             logWarn("Failed to close environment", ex);
         }
-        super.releaseSub();
+        super.releaseSub(timeout, timeUnit);
     }
 
     private void doSend(final SimpleEvent event) {