You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2013/07/10 01:30:11 UTC

svn commit: r1501611 - in /logging/log4j/log4j2/trunk: flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java src/changes/changes.xml

Author: rgoers
Date: Tue Jul  9 23:30:11 2013
New Revision: 1501611

URL: http://svn.apache.org/r1501611
Log:
LOG4J2-300 - WriterThread was ending when no agents are available which caused an OutOfMemoryError.

Modified:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
    logging/log4j/log4j2/trunk/src/changes/changes.xml

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1501611&r1=1501610&r2=1501611&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java Tue Jul  9 23:30:11 2013
@@ -28,11 +28,10 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import javax.crypto.Cipher;
 import javax.crypto.SecretKey;
 
@@ -83,7 +82,7 @@ public class FlumePersistentManager exte
 
     private final WriterThread worker;
 
-    private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
+    private final Gate gate = new Gate();
 
     private final SecretKey secretKey;
 
@@ -113,7 +112,7 @@ public class FlumePersistentManager exte
         this.delay = delay;
         this.database = database;
         this.environment = environment;
-        this.worker = new WriterThread(database, environment, this, queue, batchSize, secretKey);
+        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey);
         this.worker.start();
         this.secretKey = secretKey;
         this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
@@ -133,9 +132,10 @@ public class FlumePersistentManager exte
      * @param dataDir The location of the Berkeley database.
      * @return A FlumeAvroManager.
      */
-    public static FlumePersistentManager getManager(final String name, final Agent[] agents, final Property[] properties,
-                                                    int batchSize, final int retries, final int connectionTimeout,
-                                                    final int requestTimeout, final int delay, final String dataDir) {
+    public static FlumePersistentManager getManager(final String name, final Agent[] agents,
+                                                    final Property[] properties, int batchSize, final int retries,
+                                                    final int connectionTimeout, final int requestTimeout,
+                                                    final int delay, final String dataDir) {
         if (agents == null || agents.length == 0) {
             throw new IllegalArgumentException("At least one agent is required");
         }
@@ -184,7 +184,8 @@ public class FlumePersistentManager exte
                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
                 eventData = cipher.doFinal(eventData);
             }
-            final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, queue));
+            final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
+                gate));
             boolean interrupted = false;
             int count = 0;
             do {
@@ -244,15 +245,15 @@ public class FlumePersistentManager exte
         private final byte[] keyData;
         private final Environment environment;
         private final Database database;
-        private final LinkedBlockingQueue<byte[]> queue;
+        private final Gate gate;
 
-        public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, final Database database,
-                         final LinkedBlockingQueue<byte[]> queue) {
+        public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
+                         final Database database, final Gate gate) {
             this.keyData = keyData;
             this.eventData = eventData;
             this.environment = environment;
             this.database = database;
-            this.queue = queue;
+            this.gate = gate;
         }
 
         @Override
@@ -263,7 +264,7 @@ public class FlumePersistentManager exte
             try {
                 database.put(txn, key, data);
                 txn.commit();
-                queue.add(keyData);
+                gate.open();
             } catch (final Exception ex) {
                 if (txn != null) {
                     txn.abort();
@@ -405,16 +406,16 @@ public class FlumePersistentManager exte
         private final Database database;
         private final Environment environment;
         private final FlumePersistentManager manager;
-        private final LinkedBlockingQueue<byte[]> queue;
+        private final Gate gate;
         private final SecretKey secretKey;
         private final int batchSize;
 
-        public WriterThread(final Database database, final Environment environment, final FlumePersistentManager manager,
-                            final LinkedBlockingQueue<byte[]> queue, final int batchsize, final SecretKey secretKey) {
+        public WriterThread(final Database database, final Environment environment,
+                            final FlumePersistentManager manager, final Gate gate, final int batchsize, final SecretKey secretKey) {
             this.database = database;
             this.environment = environment;
             this.manager = manager;
-            this.queue = queue;
+            this.gate = gate;
             this.batchSize = batchsize;
             this.secretKey = secretKey;
             this.setDaemon(true);
@@ -423,9 +424,7 @@ public class FlumePersistentManager exte
         public void shutdown() {
             LOGGER.debug("Writer thread shutting down");
             this.shutdown = true;
-            if (queue.size() == 0) {
-                queue.add(SHUTDOWN.getBytes(UTF8));
-            }
+            gate.open();
         }
 
         public boolean isShutdown() {
@@ -445,7 +444,7 @@ public class FlumePersistentManager exte
                         DatabaseEntry key = new DatabaseEntry();
                         final DatabaseEntry data = new DatabaseEntry();
 
-                        queue.clear();
+                        gate.close();
                         OperationStatus status;
                         if (batchSize > 1) {
                             Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
@@ -464,26 +463,28 @@ public class FlumePersistentManager exte
                                     manager.send(batch);
                                 } catch (final Exception ioe) {
                                     LOGGER.error("Error sending events", ioe);
-                                    break;
+                                    errors = true;
                                 }
-                                cursor.close();
-                                cursor = null;
-                                final Transaction txn = environment.beginTransaction(null, null);
-                                try {
-                                    for (final Event event : batch.getEvents()) {
-                                        try {
-                                            final Map<String, String> headers = event.getHeaders();
-                                            key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
-                                            database.delete(txn, key);
-                                        } catch (final Exception ex) {
-                                            LOGGER.error("Error deleting key from database", ex);
+                                if (!errors) {
+                                    cursor.close();
+                                    cursor = null;
+                                    final Transaction txn = environment.beginTransaction(null, null);
+                                    try {
+                                        for (final Event event : batch.getEvents()) {
+                                            try {
+                                                final Map<String, String> headers = event.getHeaders();
+                                                key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+                                                database.delete(txn, key);
+                                            } catch (final Exception ex) {
+                                                LOGGER.error("Error deleting key from database", ex);
+                                            }
+                                        }
+                                        txn.commit();
+                                    } catch (final Exception ex) {
+                                        LOGGER.error("Unable to commit transaction", ex);
+                                        if (txn != null) {
+                                            txn.abort();
                                         }
-                                    }
-                                    txn.commit();
-                                } catch (final Exception ex) {
-                                    LOGGER.error("Unable to commit transaction", ex);
-                                    if (txn != null) {
-                                        txn.abort();
                                     }
                                 }
                             } catch (final Exception ex) {
@@ -548,7 +549,7 @@ public class FlumePersistentManager exte
                     while (!shutdown && (database.count() == 0 || database.count() < batchSize && nextBatch > now)) {
                         try {
                             final long interval = nextBatch - now;
-                            queue.poll(interval, TimeUnit.MILLISECONDS);
+                            gate.waitForOpen(interval, TimeUnit.MILLISECONDS);
                         } catch (final InterruptedException ie) {
                             LOGGER.warn("WriterThread interrupted, continuing");
                         } catch (final Exception ex) {
@@ -623,6 +624,28 @@ public class FlumePersistentManager exte
             }
             return thread;
         }
+    }
+
+    private static class Gate {
+        private static class Synchronizer extends AbstractQueuedSynchronizer {
+            boolean isSignalled() { return getState() != 0; }
+
+            protected int tryAcquireShared(int ignore) {
+                return isSignalled()? 1 : -1;
+            }
 
+            protected boolean tryReleaseShared(int state) {
+                setState(state);
+                return true;
+            }
+        }
+
+        private final Synchronizer sync = new Synchronizer();
+        public boolean isSignalled() { return sync.isSignalled(); }
+        public void open()         { sync.releaseShared(1); }
+        public void close()        { sync.releaseShared(0); }
+        public void waitForOpen(long timeout, TimeUnit timeUnit) throws InterruptedException {
+            sync.tryAcquireSharedNanos(1, timeUnit.toNanos(timeout));
+        }
     }
 }

Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1501611&r1=1501610&r2=1501611&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Tue Jul  9 23:30:11 2013
@@ -21,6 +21,9 @@
   </properties>
   <body>
     <release version="2.0-beta8" date="2013-??-??" description="Bug fixes and enhancements">
+      <action issue="LOG4J2-300" dev="rgoers" type="fix">
+        WriterThread was ending when no agents are available which caused an OutOfMemoryError.
+      </action>
       <action issue="LOG4J2-282" dev="rgoers" type="update">
         Allow the default status level to be specified as a system property.
       </action>