You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rp...@apache.org on 2016/09/08 14:50:35 UTC
[06/50] [abbrv] logging-log4j2 git commit: Propagate and use timeout
values from Configurator.shutdown(LoggerContext, long, TimeUnit).
Propagate and use timeout values from
Configurator.shutdown(LoggerContext,long,TimeUnit).
Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/17046951
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/17046951
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/17046951
Branch: refs/heads/LOG4J2-1349-gcfree-threadcontext
Commit: 170469514b374eb5a5a33bde6936162fd608f097
Parents: 56a688f
Author: Gary Gregory <gg...@apache.org>
Authored: Tue Sep 6 05:22:19 2016 -0400
Committer: Gary Gregory <gg...@apache.org>
Committed: Tue Sep 6 05:22:19 2016 -0400
----------------------------------------------------------------------
.../logging/log4j/core/AbstractLifeCycle.java | 2 +-
.../log4j/core/appender/AbstractManager.java | 13 +-
.../appender/AbstractOutputStreamAppender.java | 2 +-
.../core/appender/AbstractWriterAppender.java | 2 +-
.../core/appender/OutputStreamManager.java | 3 +-
.../log4j/core/appender/WriterManager.java | 3 +-
.../appender/db/AbstractDatabaseAppender.java | 2 +-
.../appender/db/AbstractDatabaseManager.java | 3 +-
.../log4j/core/appender/mom/JmsAppender.java | 2 +-
.../log4j/core/appender/mom/JmsManager.java | 6 +-
.../appender/mom/jeromq/JeroMqAppender.java | 2 +-
.../core/appender/mom/jeromq/JeroMqManager.java | 3 +-
.../core/appender/mom/kafka/KafkaAppender.java | 2 +-
.../core/appender/mom/kafka/KafkaManager.java | 17 +-
.../log4j/core/filter/AbstractFilterable.java | 12 +-
.../logging/log4j/core/net/JndiManager.java | 3 +-
.../log4j/core/net/server/JmsServer.java | 2 +-
.../db/AbstractDatabaseAppenderTest.java | 4 +-
.../db/AbstractDatabaseManagerTest.java | 2 +-
.../log4j/flume/appender/FlumeAppender.java | 2 +-
.../log4j/flume/appender/FlumeAvroManager.java | 666 +++++++++----------
.../flume/appender/FlumeEmbeddedManager.java | 3 +-
.../flume/appender/FlumePersistentManager.java | 16 +-
23 files changed, 386 insertions(+), 386 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/AbstractLifeCycle.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/AbstractLifeCycle.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/AbstractLifeCycle.java
index 3fb15bf..33aabd1 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/AbstractLifeCycle.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/AbstractLifeCycle.java
@@ -28,7 +28,7 @@ import org.apache.logging.log4j.status.StatusLogger;
*/
public class AbstractLifeCycle implements LifeCycle {
- public static final int DEFAULT_STOP_TIMEOUT = -1;
+ public static final int DEFAULT_STOP_TIMEOUT = 0;
public static final TimeUnit DEFAULT_STOP_TIMEUNIT = TimeUnit.MILLISECONDS;
/**
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractManager.java
index 0a036a0..dfe7ffd 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractManager.java
@@ -18,11 +18,13 @@ package org.apache.logging.log4j.core.appender;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.AbstractLifeCycle;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.status.StatusLogger;
@@ -69,20 +71,23 @@ public abstract class AbstractManager implements AutoCloseable {
*/
@Override
public void close() {
+ stop(AbstractLifeCycle.DEFAULT_STOP_TIMEOUT, AbstractLifeCycle.DEFAULT_STOP_TIMEUNIT);
+ }
+
+ public void stop(final long timeout, final TimeUnit timeUnit) {
LOCK.lock();
try {
--count;
if (count <= 0) {
MAP.remove(name);
LOGGER.debug("Shutting down {} {}", this.getClass().getSimpleName(), getName());
- releaseSub();
+ releaseSub(timeout, timeUnit);
}
} finally {
LOCK.unlock();
}
}
-
/**
* Retrieves a Manager if it has been previously created or creates a new Manager.
* @param name The name of the Manager to retrieve.
@@ -136,8 +141,10 @@ public abstract class AbstractManager implements AutoCloseable {
/**
* May be overridden by Managers to perform processing while the Manager is being released and the
* lock is held.
+ * @param timeout TODO
+ * @param timeUnit TODO
*/
- protected void releaseSub() {
+ protected void releaseSub(long timeout, TimeUnit timeUnit) {
// This default implementation does nothing.
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractOutputStreamAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractOutputStreamAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractOutputStreamAppender.java
index 98030fc..4d16be8 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractOutputStreamAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractOutputStreamAppender.java
@@ -139,7 +139,7 @@ public abstract class AbstractOutputStreamAppender<M extends OutputStreamManager
@Override
protected boolean stop(final long timeout, final TimeUnit timeUnit, final boolean changeLifeCycleState) {
super.stop(timeout, timeUnit, changeLifeCycleState);
- manager.close();
+ manager.stop(timeout, timeUnit);
if (changeLifeCycleState) {
setStopped();
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractWriterAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractWriterAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractWriterAppender.java
index 8b85588..7e1d46b 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractWriterAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AbstractWriterAppender.java
@@ -119,7 +119,7 @@ public abstract class AbstractWriterAppender<M extends WriterManager> extends Ab
public boolean stop(final long timeout, final TimeUnit timeUnit) {
setStopping();
super.stop(timeout, timeUnit, false);
- manager.close();
+ manager.stop(timeout, timeUnit);
setStopped();
return true;
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/OutputStreamManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/OutputStreamManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/OutputStreamManager.java
index 81d5027..f42cc72 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/OutputStreamManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/OutputStreamManager.java
@@ -21,6 +21,7 @@ import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LoggerContext;
@@ -130,7 +131,7 @@ public class OutputStreamManager extends AbstractManager implements ByteBufferDe
* Default hook to write footer during close.
*/
@Override
- public void releaseSub() {
+ public void releaseSub(long timeout, TimeUnit timeUnit) {
writeFooter();
closeOutputStream();
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/WriterManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/WriterManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/WriterManager.java
index cee99e8..5a95794 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/WriterManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/WriterManager.java
@@ -18,6 +18,7 @@ package org.apache.logging.log4j.core.appender;
import java.io.IOException;
import java.io.Writer;
+import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.core.StringLayout;
@@ -98,7 +99,7 @@ public class WriterManager extends AbstractManager {
* Default hook to write footer during close.
*/
@Override
- public void releaseSub() {
+ public void releaseSub(long timeout, TimeUnit timeUnit) {
writeFooter();
closeWriter();
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppender.java
index 803e0cc..85a3a67 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppender.java
@@ -95,7 +95,7 @@ public abstract class AbstractDatabaseAppender<T extends AbstractDatabaseManager
setStopping();
super.stop(timeout, timeUnit, false);
if (this.getManager() != null) {
- this.getManager().close();
+ this.getManager().stop(timeout, timeUnit);
}
setStopped();
return true;
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java
index 4457008..26f0070 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManager.java
@@ -19,6 +19,7 @@ package org.apache.logging.log4j.core.appender.db;
import java.io.Flushable;
import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractManager;
@@ -170,7 +171,7 @@ public abstract class AbstractDatabaseManager extends AbstractManager implements
}
@Override
- public final void releaseSub() {
+ public final void releaseSub(long timeout, TimeUnit timeUnit) {
this.shutdown();
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsAppender.java
index 442243b..8e21de8 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsAppender.java
@@ -74,7 +74,7 @@ public class JmsAppender extends AbstractAppender {
public boolean stop(final long timeout, final TimeUnit timeUnit) {
setStopping();
super.stop(timeout, timeUnit, false);
- this.manager.close();
+ this.manager.stop(timeout, timeUnit);
setStopped();
return true;
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsManager.java
index ff04c5a..2d8dfda 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/JmsManager.java
@@ -18,6 +18,8 @@
package org.apache.logging.log4j.core.appender.mom;
import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -123,7 +125,7 @@ public class JmsManager extends AbstractManager {
}
@Override
- protected void releaseSub() {
+ protected void releaseSub(long timeout, TimeUnit timeUnit) {
try {
this.session.close();
} catch (final JMSException ignored) {
@@ -134,7 +136,7 @@ public class JmsManager extends AbstractManager {
} catch (final JMSException ignored) {
// ignore
}
- this.jndiManager.close();
+ this.jndiManager.stop(timeout, timeUnit);
}
private static class JmsConfiguration {
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
index 0f96a21..5a60653 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
@@ -153,7 +153,7 @@ public final class JeroMqAppender extends AbstractAppender {
public boolean stop(final long timeout, final TimeUnit timeUnit) {
setStopping();
super.stop(timeout, timeUnit, false);
- manager.close();
+ manager.stop(timeout, timeUnit);
setStopped();
return true;
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
index 6f106be..c460d93 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
@@ -19,6 +19,7 @@ package org.apache.logging.log4j.core.appender.mom.jeromq;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.appender.AbstractManager;
@@ -104,7 +105,7 @@ public class JeroMqManager extends AbstractManager {
}
@Override
- protected void releaseSub() {
+ protected void releaseSub(long timeout, TimeUnit timeUnit) {
publisher.close();
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
index 6f8e872..bee9437 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -104,7 +104,7 @@ public final class KafkaAppender extends AbstractAppender {
public boolean stop(final long timeout, final TimeUnit timeUnit) {
setStopping();
super.stop(timeout, timeUnit, false);
- manager.close();
+ manager.stop(timeout, timeUnit);
setStopped();
return true;
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index 35aafb7..58c4ba2 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -56,22 +56,9 @@ public class KafkaManager extends AbstractManager {
}
@Override
- public void releaseSub() {
+ public void releaseSub(final long timeout, final TimeUnit timeUnit) {
if (producer != null) {
- // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
- final Runnable task = new Runnable() {
- @Override
- public void run() {
- if (producer != null) {
- producer.close();
- }
- }
- };
- try {
- getLoggerContext().submitDaemon(task).get(timeoutMillis, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- // ignore
- }
+ producer.close(timeout, timeUnit);
}
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/filter/AbstractFilterable.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/filter/AbstractFilterable.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/filter/AbstractFilterable.java
index 81b8ea9..92b8eee 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/filter/AbstractFilterable.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/filter/AbstractFilterable.java
@@ -146,12 +146,7 @@ public abstract class AbstractFilterable extends AbstractLifeCycle implements Fi
*/
@Override
public boolean stop(final long timeout, final TimeUnit timeUnit) {
- this.setStopping();
- if (filter != null) {
- filter.stop(timeout, timeUnit);
- }
- this.setStopped();
- return true;
+ return stop(timeout, timeUnit, true);
}
/**
@@ -161,13 +156,14 @@ public abstract class AbstractFilterable extends AbstractLifeCycle implements Fi
if (changeLifeCycleState) {
this.setStopping();
}
+ boolean stopped = true;
if (filter != null) {
- filter.stop(timeout, timeUnit);
+ stopped = filter.stop(timeout, timeUnit);
}
if (changeLifeCycleState) {
this.setStopped();
}
- return true;
+ return stopped;
}
/**
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/net/JndiManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/JndiManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/JndiManager.java
index c413373..91f9bcb 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/JndiManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/JndiManager.java
@@ -18,6 +18,7 @@
package org.apache.logging.log4j.core.net;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import javax.naming.Context;
import javax.naming.InitialContext;
@@ -111,7 +112,7 @@ public class JndiManager extends AbstractManager {
}
@Override
- protected void releaseSub() {
+ protected void releaseSub(long timeout, TimeUnit timeUnit) {
JndiCloser.closeSilently(this.context);
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/main/java/org/apache/logging/log4j/core/net/server/JmsServer.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/server/JmsServer.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/server/JmsServer.java
index 887bcc5..15e0a3f 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/server/JmsServer.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/server/JmsServer.java
@@ -110,7 +110,7 @@ public class JmsServer extends LogEventListener implements MessageListener, Life
} catch (final JMSException e) {
LOGGER.debug("Exception closing {}", messageConsumer, e);
}
- jmsManager.close();
+ jmsManager.stop(timeout, timeUnit);
return true;
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppenderTest.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppenderTest.java
index d7f955d..738b2d5 100644
--- a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppenderTest.java
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseAppenderTest.java
@@ -79,7 +79,7 @@ public class AbstractDatabaseAppenderTest {
verify(this.manager, this.appender);
reset(this.manager, this.appender);
- this.manager.close();
+ this.manager.shutdownInternal();
expectLastCall();
replay(this.manager, this.appender);
@@ -111,7 +111,7 @@ public class AbstractDatabaseAppenderTest {
verify(this.manager, this.appender, newManager);
reset(this.manager, this.appender, newManager);
- newManager.close();
+ newManager.shutdownInternal();
expectLastCall();
replay(this.manager, this.appender, newManager);
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManagerTest.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManagerTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManagerTest.java
index e462b31..36c89d2 100644
--- a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManagerTest.java
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/db/AbstractDatabaseManagerTest.java
@@ -92,7 +92,7 @@ public class AbstractDatabaseManagerTest {
expectLastCall();
replay(this.manager);
- this.manager.releaseSub();
+ this.manager.releaseSub(-1, null);
assertFalse("The manager should not be running anymore.", this.manager.isRunning());
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
index 3e2cf10..bdad7e4 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
@@ -110,7 +110,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF
public boolean stop(final long timeout, final TimeUnit timeUnit) {
setStopping();
super.stop(timeout, timeUnit, false);
- manager.close();
+ manager.stop(timeout, timeUnit);
setStopped();
return true;
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
index dd5dd96..61b2392 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
@@ -1,333 +1,333 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-package org.apache.logging.log4j.flume.appender;
-
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flume.Event;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.logging.log4j.core.appender.AppenderLoggingException;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-
-/**
- * Manager for FlumeAvroAppenders.
- */
-public class FlumeAvroManager extends AbstractFlumeManager {
-
- private static final int MAX_RECONNECTS = 3;
- private static final int MINIMUM_TIMEOUT = 1000;
-
- private static AvroManagerFactory factory = new AvroManagerFactory();
-
- private final Agent[] agents;
-
- private final int batchSize;
-
- private final long delayNanos;
- private final int delayMillis;
-
- private final int retries;
-
- private final int connectTimeoutMillis;
-
- private final int requestTimeoutMillis;
-
- private final int current = 0;
-
- private RpcClient rpcClient = null;
-
- private BatchEvent batchEvent = new BatchEvent();
- private long nextSend = 0;
-
- /**
- * Constructor
- * @param name The unique name of this manager.
- * @param agents An array of Agents.
- * @param batchSize The number of events to include in a batch.
- * @param retries The number of times to retry connecting before giving up.
- * @param connectTimeout The connection timeout in ms.
- * @param requestTimeout The request timeout in ms.
- *
- */
- protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
- final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
- super(name);
- this.agents = agents;
- this.batchSize = batchSize;
- this.delayMillis = delayMillis;
- this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
- this.retries = retries;
- this.connectTimeoutMillis = connectTimeout;
- this.requestTimeoutMillis = requestTimeout;
- this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
- }
-
- /**
- * Returns a FlumeAvroManager.
- * @param name The name of the manager.
- * @param agents The agents to use.
- * @param batchSize The number of events to include in a batch.
- * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
- * @param retries The number of times to retry connecting before giving up.
- * @param connectTimeoutMillis The connection timeout in ms.
- * @param requestTimeoutMillis The request timeout in ms.
- * @return A FlumeAvroManager.
- */
- public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
- final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
- if (agents == null || agents.length == 0) {
- throw new IllegalArgumentException("At least one agent is required");
- }
-
- if (batchSize <= 0) {
- batchSize = 1;
- }
-
- final StringBuilder sb = new StringBuilder("FlumeAvro[");
- boolean first = true;
- for (final Agent agent : agents) {
- if (!first) {
- sb.append(',');
- }
- sb.append(agent.getHost()).append(':').append(agent.getPort());
- first = false;
- }
- sb.append(']');
- return getManager(sb.toString(), factory,
- new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
- }
-
- /**
- * Returns the agents.
- * @return The agent array.
- */
- public Agent[] getAgents() {
- return agents;
- }
-
- /**
- * Returns the index of the current agent.
- * @return The index for the current agent.
- */
- public int getCurrent() {
- return current;
- }
-
- public int getRetries() {
- return retries;
- }
-
- public int getConnectTimeoutMillis() {
- return connectTimeoutMillis;
- }
-
- public int getRequestTimeoutMillis() {
- return requestTimeoutMillis;
- }
-
- public int getBatchSize() {
- return batchSize;
- }
-
- public int getDelayMillis() {
- return delayMillis;
- }
-
- public synchronized void send(final BatchEvent events) {
- if (rpcClient == null) {
- rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
- }
-
- if (rpcClient != null) {
- try {
- LOGGER.trace("Sending batch of {} events", events.getEvents().size());
- rpcClient.appendBatch(events.getEvents());
- } catch (final Exception ex) {
- rpcClient.close();
- rpcClient = null;
- final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
- agents[current].getPort();
- LOGGER.warn(msg, ex);
- throw new AppenderLoggingException("No Flume agents are available");
- }
- } else {
- final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
- agents[current].getPort();
- LOGGER.warn(msg);
- throw new AppenderLoggingException("No Flume agents are available");
- }
- }
-
- @Override
- public synchronized void send(final Event event) {
- if (batchSize == 1) {
- if (rpcClient == null) {
- rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
- }
-
- if (rpcClient != null) {
- try {
- rpcClient.append(event);
- } catch (final Exception ex) {
- rpcClient.close();
- rpcClient = null;
- final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
- agents[current].getPort();
- LOGGER.warn(msg, ex);
- throw new AppenderLoggingException("No Flume agents are available");
- }
- } else {
- final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
- agents[current].getPort();
- LOGGER.warn(msg);
- throw new AppenderLoggingException("No Flume agents are available");
- }
- } else {
- batchEvent.addEvent(event);
- final int eventCount = batchEvent.getEvents().size();
- if (eventCount == 1) {
- nextSend = System.nanoTime() + delayNanos;
- }
- if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
- send(batchEvent);
- batchEvent = new BatchEvent();
- }
- }
- }
-
- /**
- * There is a very good chance that this will always return the first agent even if it isn't available.
- * @param agents The list of agents to choose from
- * @return The FlumeEventAvroServer.
- */
- private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
- try {
- final Properties props = new Properties();
-
- props.put("client.type", "default_failover");
-
- int agentCount = 1;
- final StringBuilder sb = new StringBuilder();
- for (final Agent agent : agents) {
- if (sb.length() > 0) {
- sb.append(' ');
- }
- final String hostName = "host" + agentCount++;
- props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
- sb.append(hostName);
- }
- props.put("hosts", sb.toString());
- if (batchSize > 0) {
- props.put("batch-size", Integer.toString(batchSize));
- }
- if (retries > 1) {
- if (retries > MAX_RECONNECTS) {
- retries = MAX_RECONNECTS;
- }
- props.put("max-attempts", Integer.toString(retries * agents.length));
- }
- if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
- props.put("request-timeout", Integer.toString(requestTimeoutMillis));
- }
- if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
- props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
- }
- return RpcClientFactory.getInstance(props);
- } catch (final Exception ex) {
- LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
- return null;
- }
- }
-
- @Override
- protected void releaseSub() {
- if (rpcClient != null) {
- try {
- synchronized(this) {
- try {
- if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
- send(batchEvent);
- }
- } catch (final Exception ex) {
- LOGGER.error("Error sending final batch: {}", ex.getMessage());
- }
- }
- rpcClient.close();
- } catch (final Exception ex) {
- LOGGER.error("Attempt to close RPC client failed", ex);
- }
- }
- rpcClient = null;
- }
-
- /**
- * Factory data.
- */
- private static class FactoryData {
- private final String name;
- private final Agent[] agents;
- private final int batchSize;
- private final int delayMillis;
- private final int retries;
- private final int conntectTimeoutMillis;
- private final int requestTimeoutMillis;
-
- /**
- * Constructor.
- * @param name The name of the Appender.
- * @param agents The agents.
- * @param batchSize The number of events to include in a batch.
- */
- public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
- final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
- this.name = name;
- this.agents = agents;
- this.batchSize = batchSize;
- this.delayMillis = delayMillis;
- this.retries = retries;
- this.conntectTimeoutMillis = connectTimeoutMillis;
- this.requestTimeoutMillis = requestTimeoutMillis;
- }
- }
-
- /**
- * Avro Manager Factory.
- */
- private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
-
- /**
- * Create the FlumeAvroManager.
- * @param name The name of the entity to manage.
- * @param data The data required to create the entity.
- * @return The FlumeAvroManager.
- */
- @Override
- public FlumeAvroManager createManager(final String name, final FactoryData data) {
- try {
-
- return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
- data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
- } catch (final Exception ex) {
- LOGGER.error("Could not create FlumeAvroManager", ex);
- }
- return null;
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Event;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.logging.log4j.core.appender.AppenderLoggingException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+
+/**
+ * Manager for FlumeAvroAppenders.
+ */
+public class FlumeAvroManager extends AbstractFlumeManager {
+
+ private static final int MAX_RECONNECTS = 3;
+ private static final int MINIMUM_TIMEOUT = 1000;
+
+ private static AvroManagerFactory factory = new AvroManagerFactory();
+
+ private final Agent[] agents;
+
+ private final int batchSize;
+
+ private final long delayNanos;
+ private final int delayMillis;
+
+ private final int retries;
+
+ private final int connectTimeoutMillis;
+
+ private final int requestTimeoutMillis;
+
+ private final int current = 0;
+
+ private RpcClient rpcClient = null;
+
+ private BatchEvent batchEvent = new BatchEvent();
+ private long nextSend = 0;
+
+ /**
+ * Constructor
+ * @param name The unique name of this manager.
+ * @param agents An array of Agents.
+ * @param batchSize The number of events to include in a batch.
+ * @param retries The number of times to retry connecting before giving up.
+ * @param connectTimeout The connection timeout in ms.
+ * @param requestTimeout The request timeout in ms.
+ *
+ */
+ protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
+ final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
+ super(name);
+ this.agents = agents;
+ this.batchSize = batchSize;
+ this.delayMillis = delayMillis;
+ this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
+ this.retries = retries;
+ this.connectTimeoutMillis = connectTimeout;
+ this.requestTimeoutMillis = requestTimeout;
+ this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
+ }
+
+ /**
+ * Returns a FlumeAvroManager.
+ * @param name The name of the manager.
+ * @param agents The agents to use.
+ * @param batchSize The number of events to include in a batch.
+ * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
+ * @param retries The number of times to retry connecting before giving up.
+ * @param connectTimeoutMillis The connection timeout in ms.
+ * @param requestTimeoutMillis The request timeout in ms.
+ * @return A FlumeAvroManager.
+ */
+ public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
+ final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+ if (agents == null || agents.length == 0) {
+ throw new IllegalArgumentException("At least one agent is required");
+ }
+
+ if (batchSize <= 0) {
+ batchSize = 1;
+ }
+
+ final StringBuilder sb = new StringBuilder("FlumeAvro[");
+ boolean first = true;
+ for (final Agent agent : agents) {
+ if (!first) {
+ sb.append(',');
+ }
+ sb.append(agent.getHost()).append(':').append(agent.getPort());
+ first = false;
+ }
+ sb.append(']');
+ return getManager(sb.toString(), factory,
+ new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
+ }
+
+ /**
+ * Returns the agents.
+ * @return The agent array.
+ */
+ public Agent[] getAgents() {
+ return agents;
+ }
+
+ /**
+ * Returns the index of the current agent.
+ * @return The index for the current agent.
+ */
+ public int getCurrent() {
+ return current;
+ }
+
+ public int getRetries() {
+ return retries;
+ }
+
+ public int getConnectTimeoutMillis() {
+ return connectTimeoutMillis;
+ }
+
+ public int getRequestTimeoutMillis() {
+ return requestTimeoutMillis;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public int getDelayMillis() {
+ return delayMillis;
+ }
+
+ public synchronized void send(final BatchEvent events) {
+ if (rpcClient == null) {
+ rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
+ }
+
+ if (rpcClient != null) {
+ try {
+ LOGGER.trace("Sending batch of {} events", events.getEvents().size());
+ rpcClient.appendBatch(events.getEvents());
+ } catch (final Exception ex) {
+ rpcClient.close();
+ rpcClient = null;
+ final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+ agents[current].getPort();
+ LOGGER.warn(msg, ex);
+ throw new AppenderLoggingException("No Flume agents are available");
+ }
+ } else {
+ final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+ agents[current].getPort();
+ LOGGER.warn(msg);
+ throw new AppenderLoggingException("No Flume agents are available");
+ }
+ }
+
+ @Override
+ public synchronized void send(final Event event) {
+ if (batchSize == 1) {
+ if (rpcClient == null) {
+ rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
+ }
+
+ if (rpcClient != null) {
+ try {
+ rpcClient.append(event);
+ } catch (final Exception ex) {
+ rpcClient.close();
+ rpcClient = null;
+ final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+ agents[current].getPort();
+ LOGGER.warn(msg, ex);
+ throw new AppenderLoggingException("No Flume agents are available");
+ }
+ } else {
+ final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+ agents[current].getPort();
+ LOGGER.warn(msg);
+ throw new AppenderLoggingException("No Flume agents are available");
+ }
+ } else {
+ batchEvent.addEvent(event);
+ final int eventCount = batchEvent.getEvents().size();
+ if (eventCount == 1) {
+ nextSend = System.nanoTime() + delayNanos;
+ }
+ if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
+ send(batchEvent);
+ batchEvent = new BatchEvent();
+ }
+ }
+ }
+
+ /**
+ * There is a very good chance that this will always return the first agent even if it isn't available.
+ * @param agents The list of agents to choose from
+ * @return The FlumeEventAvroServer.
+ */
+ private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+ try {
+ final Properties props = new Properties();
+
+ props.put("client.type", "default_failover");
+
+ int agentCount = 1;
+ final StringBuilder sb = new StringBuilder();
+ for (final Agent agent : agents) {
+ if (sb.length() > 0) {
+ sb.append(' ');
+ }
+ final String hostName = "host" + agentCount++;
+ props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
+ sb.append(hostName);
+ }
+ props.put("hosts", sb.toString());
+ if (batchSize > 0) {
+ props.put("batch-size", Integer.toString(batchSize));
+ }
+ if (retries > 1) {
+ if (retries > MAX_RECONNECTS) {
+ retries = MAX_RECONNECTS;
+ }
+ props.put("max-attempts", Integer.toString(retries * agents.length));
+ }
+ if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
+ props.put("request-timeout", Integer.toString(requestTimeoutMillis));
+ }
+ if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
+ props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
+ }
+ return RpcClientFactory.getInstance(props);
+ } catch (final Exception ex) {
+ LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
+ return null;
+ }
+ }
+
+ @Override
+ protected void releaseSub(final long timeout, final TimeUnit timeUnit) {
+ if (rpcClient != null) {
+ try {
+ synchronized(this) {
+ try {
+ if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
+ send(batchEvent);
+ }
+ } catch (final Exception ex) {
+ LOGGER.error("Error sending final batch: {}", ex.getMessage());
+ }
+ }
+ rpcClient.close();
+ } catch (final Exception ex) {
+ LOGGER.error("Attempt to close RPC client failed", ex);
+ }
+ }
+ rpcClient = null;
+ }
+
+ /**
+ * Factory data.
+ */
+ private static class FactoryData {
+ private final String name;
+ private final Agent[] agents;
+ private final int batchSize;
+ private final int delayMillis;
+ private final int retries;
+ private final int conntectTimeoutMillis;
+ private final int requestTimeoutMillis;
+
+ /**
+ * Constructor.
+ * @param name The name of the Appender.
+ * @param agents The agents.
+ * @param batchSize The number of events to include in a batch.
+ */
+ public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
+ final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+ this.name = name;
+ this.agents = agents;
+ this.batchSize = batchSize;
+ this.delayMillis = delayMillis;
+ this.retries = retries;
+ this.conntectTimeoutMillis = connectTimeoutMillis;
+ this.requestTimeoutMillis = requestTimeoutMillis;
+ }
+ }
+
+ /**
+ * Avro Manager Factory.
+ */
+ private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
+
+ /**
+ * Create the FlumeAvroManager.
+ * @param name The name of the entity to manage.
+ * @param data The data required to create the entity.
+ * @return The FlumeAvroManager.
+ */
+ @Override
+ public FlumeAvroManager createManager(final String name, final FactoryData data) {
+ try {
+
+ return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
+ data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
+ } catch (final Exception ex) {
+ LOGGER.error("Could not create FlumeAvroManager", ex);
+ }
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
index a23c2ed..ec5ee19 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
@@ -19,6 +19,7 @@ package org.apache.logging.log4j.flume.appender;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
@@ -119,7 +120,7 @@ public class FlumeEmbeddedManager extends AbstractFlumeManager {
}
@Override
- protected void releaseSub() {
+ protected void releaseSub(final long timeout, final TimeUnit timeUnit) {
agent.stop();
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/17046951/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
index 0f6497b..7f21c9b 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
@@ -76,9 +76,9 @@ public class FlumePersistentManager extends FlumeAvroManager {
private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
- private static final int SHUTDOWN_WAIT_SECONDS = 60;
+ private static final long SHUTDOWN_WAIT_MILLIS = 60000;
- private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
+ private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
private static BDBManagerFactory factory = new BDBManagerFactory();
@@ -217,15 +217,17 @@ public class FlumePersistentManager extends FlumeAvroManager {
}
@Override
- protected void releaseSub() {
+ protected void releaseSub(final long timeout, final TimeUnit timeUnit) {
LOGGER.debug("Shutting down FlumePersistentManager");
worker.shutdown();
- try {
- worker.join(TimeUnit.SECONDS.toMillis(SHUTDOWN_WAIT_SECONDS));
+ final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
+ final long shutdownWaitMillis = requestedTimeoutMillis < 0 ? SHUTDOWN_WAIT_MILLIS : requestedTimeoutMillis;
+ try {
+ worker.join(shutdownWaitMillis);
} catch (final InterruptedException ie) {
// Ignore the exception and shutdown.
}
- ExecutorServices.shutdown(threadPool, SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS, toString());
+ ExecutorServices.shutdown(threadPool, shutdownWaitMillis, TimeUnit.MILLISECONDS, toString());
try {
worker.join();
} catch (final InterruptedException ex) {
@@ -243,7 +245,7 @@ public class FlumePersistentManager extends FlumeAvroManager {
} catch (final Exception ex) {
logWarn("Failed to close environment", ex);
}
- super.releaseSub();
+ super.releaseSub(timeout, timeUnit);
}
private void doSend(final SimpleEvent event) {