You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2013/07/10 01:30:11 UTC
svn commit: r1501611 - in /logging/log4j/log4j2/trunk:
flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
src/changes/changes.xml
Author: rgoers
Date: Tue Jul 9 23:30:11 2013
New Revision: 1501611
URL: http://svn.apache.org/r1501611
Log:
LOG4J2-300 - WriterThread was ending when no agents are available which caused an OutOfMemoryError.
Modified:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
logging/log4j/log4j2/trunk/src/changes/changes.xml
Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1501611&r1=1501610&r2=1501611&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java Tue Jul 9 23:30:11 2013
@@ -28,11 +28,10 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
@@ -83,7 +82,7 @@ public class FlumePersistentManager exte
private final WriterThread worker;
- private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
+ private final Gate gate = new Gate();
private final SecretKey secretKey;
@@ -113,7 +112,7 @@ public class FlumePersistentManager exte
this.delay = delay;
this.database = database;
this.environment = environment;
- this.worker = new WriterThread(database, environment, this, queue, batchSize, secretKey);
+ this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey);
this.worker.start();
this.secretKey = secretKey;
this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
@@ -133,9 +132,10 @@ public class FlumePersistentManager exte
* @param dataDir The location of the Berkeley database.
* @return A FlumeAvroManager.
*/
- public static FlumePersistentManager getManager(final String name, final Agent[] agents, final Property[] properties,
- int batchSize, final int retries, final int connectionTimeout,
- final int requestTimeout, final int delay, final String dataDir) {
+ public static FlumePersistentManager getManager(final String name, final Agent[] agents,
+ final Property[] properties, int batchSize, final int retries,
+ final int connectionTimeout, final int requestTimeout,
+ final int delay, final String dataDir) {
if (agents == null || agents.length == 0) {
throw new IllegalArgumentException("At least one agent is required");
}
@@ -184,7 +184,8 @@ public class FlumePersistentManager exte
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
eventData = cipher.doFinal(eventData);
}
- final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, queue));
+ final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
+ gate));
boolean interrupted = false;
int count = 0;
do {
@@ -244,15 +245,15 @@ public class FlumePersistentManager exte
private final byte[] keyData;
private final Environment environment;
private final Database database;
- private final LinkedBlockingQueue<byte[]> queue;
+ private final Gate gate;
- public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, final Database database,
- final LinkedBlockingQueue<byte[]> queue) {
+ public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
+ final Database database, final Gate gate) {
this.keyData = keyData;
this.eventData = eventData;
this.environment = environment;
this.database = database;
- this.queue = queue;
+ this.gate = gate;
}
@Override
@@ -263,7 +264,7 @@ public class FlumePersistentManager exte
try {
database.put(txn, key, data);
txn.commit();
- queue.add(keyData);
+ gate.open();
} catch (final Exception ex) {
if (txn != null) {
txn.abort();
@@ -405,16 +406,16 @@ public class FlumePersistentManager exte
private final Database database;
private final Environment environment;
private final FlumePersistentManager manager;
- private final LinkedBlockingQueue<byte[]> queue;
+ private final Gate gate;
private final SecretKey secretKey;
private final int batchSize;
- public WriterThread(final Database database, final Environment environment, final FlumePersistentManager manager,
- final LinkedBlockingQueue<byte[]> queue, final int batchsize, final SecretKey secretKey) {
+ public WriterThread(final Database database, final Environment environment,
+ final FlumePersistentManager manager, final Gate gate, final int batchsize, final SecretKey secretKey) {
this.database = database;
this.environment = environment;
this.manager = manager;
- this.queue = queue;
+ this.gate = gate;
this.batchSize = batchsize;
this.secretKey = secretKey;
this.setDaemon(true);
@@ -423,9 +424,7 @@ public class FlumePersistentManager exte
public void shutdown() {
LOGGER.debug("Writer thread shutting down");
this.shutdown = true;
- if (queue.size() == 0) {
- queue.add(SHUTDOWN.getBytes(UTF8));
- }
+ gate.open();
}
public boolean isShutdown() {
@@ -445,7 +444,7 @@ public class FlumePersistentManager exte
DatabaseEntry key = new DatabaseEntry();
final DatabaseEntry data = new DatabaseEntry();
- queue.clear();
+ gate.close();
OperationStatus status;
if (batchSize > 1) {
Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
@@ -464,26 +463,28 @@ public class FlumePersistentManager exte
manager.send(batch);
} catch (final Exception ioe) {
LOGGER.error("Error sending events", ioe);
- break;
+ errors = true;
}
- cursor.close();
- cursor = null;
- final Transaction txn = environment.beginTransaction(null, null);
- try {
- for (final Event event : batch.getEvents()) {
- try {
- final Map<String, String> headers = event.getHeaders();
- key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
- database.delete(txn, key);
- } catch (final Exception ex) {
- LOGGER.error("Error deleting key from database", ex);
+ if (!errors) {
+ cursor.close();
+ cursor = null;
+ final Transaction txn = environment.beginTransaction(null, null);
+ try {
+ for (final Event event : batch.getEvents()) {
+ try {
+ final Map<String, String> headers = event.getHeaders();
+ key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+ database.delete(txn, key);
+ } catch (final Exception ex) {
+ LOGGER.error("Error deleting key from database", ex);
+ }
+ }
+ txn.commit();
+ } catch (final Exception ex) {
+ LOGGER.error("Unable to commit transaction", ex);
+ if (txn != null) {
+ txn.abort();
}
- }
- txn.commit();
- } catch (final Exception ex) {
- LOGGER.error("Unable to commit transaction", ex);
- if (txn != null) {
- txn.abort();
}
}
} catch (final Exception ex) {
@@ -548,7 +549,7 @@ public class FlumePersistentManager exte
while (!shutdown && (database.count() == 0 || database.count() < batchSize && nextBatch > now)) {
try {
final long interval = nextBatch - now;
- queue.poll(interval, TimeUnit.MILLISECONDS);
+ gate.waitForOpen(interval, TimeUnit.MILLISECONDS);
} catch (final InterruptedException ie) {
LOGGER.warn("WriterThread interrupted, continuing");
} catch (final Exception ex) {
@@ -623,6 +624,28 @@ public class FlumePersistentManager exte
}
return thread;
}
+ }
+
+ private static class Gate {
+ private static class Synchronizer extends AbstractQueuedSynchronizer {
+ boolean isSignalled() { return getState() != 0; }
+
+ protected int tryAcquireShared(int ignore) {
+ return isSignalled()? 1 : -1;
+ }
+ protected boolean tryReleaseShared(int state) {
+ setState(state);
+ return true;
+ }
+ }
+
+ private final Synchronizer sync = new Synchronizer();
+ public boolean isSignalled() { return sync.isSignalled(); }
+ public void open() { sync.releaseShared(1); }
+ public void close() { sync.releaseShared(0); }
+ public void waitForOpen(long timeout, TimeUnit timeUnit) throws InterruptedException {
+ sync.tryAcquireSharedNanos(1, timeUnit.toNanos(timeout));
+ }
}
}
Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1501611&r1=1501610&r2=1501611&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Tue Jul 9 23:30:11 2013
@@ -21,6 +21,9 @@
</properties>
<body>
<release version="2.0-beta8" date="2013-??-??" description="Bug fixes and enhancements">
+ <action issue="LOG4J2-300" dev="rgoers" type="fix">
+ WriterThread was ending when no agents are available which caused an OutOfMemoryError.
+ </action>
<action issue="LOG4J2-282" dev="rgoers" type="update">
Allow the default status level to be specified as a system property.
</action>