You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2013/09/10 22:47:29 UTC

svn commit: r1521640 - in /logging/log4j/log4j2/trunk: log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/ log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/ src/changes/ src/site/xdoc/manual/

Author: rgoers
Date: Tue Sep 10 20:47:28 2013
New Revision: 1521640

URL: http://svn.apache.org/r1521640
Log:
LOG4J2-391 - FlumePersistentManager now handles LockConflictExceptions in Berkeley Db.

Modified:
    logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
    logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
    logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
    logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
    logging/log4j/log4j2/trunk/src/changes/changes.xml
    logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml

Modified: logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (original)
+++ logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java Tue Sep 10 20:47:28 2013
@@ -41,6 +41,8 @@ public final class FlumeAppender extends
     private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
     private static final int DEFAULT_MAX_DELAY = 60000;
 
+    private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
+
     private final AbstractFlumeManager manager;
 
     private final String mdcIncludes;
@@ -149,6 +151,7 @@ public final class FlumeAppender extends
      * @param eventPrefix The prefix to add to event key names.
      * @param compressBody If true the event body will be compressed.
      * @param batchSize Number of events to include in a batch. Defaults to 1.
+     * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB.
      * @param factory The factory to use to create Flume events.
      * @param layout The layout to format the event.
      * @param filter A Filter to filter events.
@@ -157,26 +160,27 @@ public final class FlumeAppender extends
      */
     @PluginFactory
     public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents,
-                                                   @PluginElement("Properties") final Property[] properties,
-                                                   @PluginAttribute("embedded") final String embedded,
-                                                   @PluginAttribute("type") final String type,
-                                                   @PluginAttribute("dataDir") final String dataDir,
-                                                   @PluginAttribute("connectTimeout") final String connectionTimeout,
-                                                   @PluginAttribute("requestTimeout") final String requestTimeout,
-                                                   @PluginAttribute("agentRetries") final String agentRetries,
-                                                   @PluginAttribute("maxDelay") final String maxDelay,
-                                                   @PluginAttribute("name") final String name,
-                                                   @PluginAttribute("ignoreExceptions") final String ignore,
-                                                   @PluginAttribute("mdcExcludes") final String excludes,
-                                                   @PluginAttribute("mdcIncludes") final String includes,
-                                                   @PluginAttribute("mdcRequired") final String required,
-                                                   @PluginAttribute("mdcPrefix") final String mdcPrefix,
-                                                   @PluginAttribute("eventPrefix") final String eventPrefix,
-                                                   @PluginAttribute("compress") final String compressBody,
-                                                   @PluginAttribute("batchSize") final String batchSize,
-                                                   @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
-                                                   @PluginElement("Layout") Layout<? extends Serializable> layout,
-                                                   @PluginElement("Filters") final Filter filter) {
+                                               @PluginElement("Properties") final Property[] properties,
+                                               @PluginAttribute("embedded") final String embedded,
+                                               @PluginAttribute("type") final String type,
+                                               @PluginAttribute("dataDir") final String dataDir,
+                                               @PluginAttribute("connectTimeout") final String connectionTimeout,
+                                               @PluginAttribute("requestTimeout") final String requestTimeout,
+                                               @PluginAttribute("agentRetries") final String agentRetries,
+                                               @PluginAttribute("maxDelay") final String maxDelay,
+                                               @PluginAttribute("name") final String name,
+                                               @PluginAttribute("ignoreExceptions") final String ignore,
+                                               @PluginAttribute("mdcExcludes") final String excludes,
+                                               @PluginAttribute("mdcIncludes") final String includes,
+                                               @PluginAttribute("mdcRequired") final String required,
+                                               @PluginAttribute("mdcPrefix") final String mdcPrefix,
+                                               @PluginAttribute("eventPrefix") final String eventPrefix,
+                                               @PluginAttribute("compress") final String compressBody,
+                                               @PluginAttribute("batchSize") final String batchSize,
+                                               @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
+                                               @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
+                                               @PluginElement("Layout") Layout<? extends Serializable> layout,
+                                               @PluginElement("Filters") final Filter filter) {
 
         final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
             (agents == null || agents.length == 0) && properties != null && properties.length > 0;
@@ -211,6 +215,7 @@ public final class FlumeAppender extends
         final int connectTimeout = Integers.parseInt(connectionTimeout, 0);
         final int reqTimeout = Integers.parseInt(requestTimeout, 0);
         final int retries = Integers.parseInt(agentRetries, 0);
+        final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
         final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY );
 
         if (layout == null) {
@@ -242,7 +247,7 @@ public final class FlumeAppender extends
                     agents = new Agent[] {Agent.createAgent(null, null)};
                 }
                 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
-                    connectTimeout, reqTimeout, delay, dataDir);
+                    connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir);
                 break;
             default:
                 LOGGER.debug("No manager type specified. Defaulting to AVRO");

Modified: logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (original)
+++ logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java Tue Sep 10 20:47:28 2013
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.Atomi
 import javax.crypto.Cipher;
 import javax.crypto.SecretKey;
 
+import com.sleepycat.je.LockConflictException;
 import org.apache.flume.Event;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.logging.log4j.LoggingException;
@@ -76,6 +77,8 @@ public class FlumePersistentManager exte
 
     private static final int MILLIS_PER_SECOND = 1000;
 
+    private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
+
     private static BDBManagerFactory factory = new BDBManagerFactory();
 
     private final Database database;
@@ -90,6 +93,8 @@ public class FlumePersistentManager exte
 
     private final int delay;
 
+    private final int lockTimeoutRetryCount;
+
     private final ExecutorService threadPool;
 
     private AtomicLong dbCount = new AtomicLong();
@@ -107,20 +112,24 @@ public class FlumePersistentManager exte
      * @param database The database to write to.
      * @param environment The database environment.
      * @param secretKey The SecretKey to use for encryption.
+     * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
      */
     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
                                      final int batchSize, final int retries, final int connectionTimeout,
                                      final int requestTimeout, final int delay, final Database database,
-                                     final Environment environment, final SecretKey secretKey) {
+                                     final Environment environment, final SecretKey secretKey,
+                                     final int lockTimeoutRetryCount) {
         super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
         this.delay = delay;
         this.database = database;
         this.environment = environment;
         dbCount.set(database.count());
-        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount);
+        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
+            lockTimeoutRetryCount);
         this.worker.start();
         this.secretKey = secretKey;
         this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
+        this.lockTimeoutRetryCount = lockTimeoutRetryCount;
     }
 
 
@@ -140,7 +149,8 @@ public class FlumePersistentManager exte
     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
                                                     final Property[] properties, int batchSize, final int retries,
                                                     final int connectionTimeout, final int requestTimeout,
-                                                    final int delay, final String dataDir) {
+                                                    final int delay, final int lockTimeoutRetryCount,
+                                                    final String dataDir) {
         if (agents == null || agents.length == 0) {
             throw new IllegalArgumentException("At least one agent is required");
         }
@@ -162,7 +172,7 @@ public class FlumePersistentManager exte
         sb.append("]");
         sb.append(" ").append(dataDirectory);
         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
-            connectionTimeout, requestTimeout, delay, dataDir, properties));
+            connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties));
     }
 
     @Override
@@ -190,7 +200,7 @@ public class FlumePersistentManager exte
                 eventData = cipher.doFinal(eventData);
             }
             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
-                gate, dbCount, getBatchSize()));
+                gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
             boolean interrupted = false;
             int count = 0;
             do {
@@ -258,9 +268,11 @@ public class FlumePersistentManager exte
         private final Gate gate;
         private final AtomicLong dbCount;
         private final long batchSize;
+        private final int lockTimeoutRetryCount;
 
         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
-                         final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize) {
+                         final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
+                         final int lockTimeoutRetryCount) {
             this.keyData = keyData;
             this.eventData = eventData;
             this.environment = environment;
@@ -268,24 +280,61 @@ public class FlumePersistentManager exte
             this.gate = gate;
             this.dbCount = dbCount;
             this.batchSize = batchSize;
+            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
         }
 
         @Override
         public Integer call() throws Exception {
             final DatabaseEntry key = new DatabaseEntry(keyData);
             final DatabaseEntry data = new DatabaseEntry(eventData);
-            final Transaction txn = environment.beginTransaction(null, null);
-            try {
-                database.put(txn, key, data);
-                txn.commit();
-                if (dbCount.incrementAndGet() >= batchSize) {
-                    gate.open();
+            Exception exception = null;
+            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+                Transaction txn = null;
+                try {
+                    txn = environment.beginTransaction(null, null);
+                    try {
+                        database.put(txn, key, data);
+                        txn.commit();
+                        txn = null;
+                        if (dbCount.incrementAndGet() >= batchSize) {
+                            gate.open();
+                        }
+                        exception = null;
+                        break;
+                    } catch (final LockConflictException lce) {
+                        exception = lce;
+                        // Fall through and retry.
+                    } catch (final Exception ex) {
+                        if (txn != null) {
+                            txn.abort();
+                        }
+                        throw ex;
+                    } finally {
+                        if (txn != null) {
+                            txn.abort();
+                            txn = null;
+                        }
+                    }
+                } catch (LockConflictException lce) {
+                    exception = lce;
+                    if (txn != null) {
+                        try {
+                            txn.abort();
+                            txn = null;
+                        } catch (Exception ex) {
+                            // Ignore exception
+                        }
+                    }
+
                 }
-            } catch (final Exception ex) {
-                if (txn != null) {
-                    txn.abort();
+                try {
+                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+                } catch (InterruptedException ie) {
+                    // Ignore the error
                 }
-                throw ex;
+            }
+            if (exception != null) {
+                throw exception;
             }
             return eventData.length;
         }
@@ -303,6 +352,7 @@ public class FlumePersistentManager exte
         private final int connectionTimeout;
         private final int requestTimeout;
         private final int delay;
+        private final int lockTimeoutRetryCount;
         private final Property[] properties;
 
         /**
@@ -314,7 +364,7 @@ public class FlumePersistentManager exte
          */
         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
                            final int connectionTimeout, final int requestTimeout, final int delay,
-                           final String dataDir, final Property[] properties) {
+                           final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
             this.name = name;
             this.agents = agents;
             this.batchSize = batchSize;
@@ -323,6 +373,7 @@ public class FlumePersistentManager exte
             this.connectionTimeout = connectionTimeout;
             this.requestTimeout = requestTimeout;
             this.delay = delay;
+            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
             this.properties = properties;
         }
     }
@@ -410,7 +461,8 @@ public class FlumePersistentManager exte
                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
             }
             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
-                data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey);
+                data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey,
+                data.lockTimeoutRetryCount);
         }
     }
 
@@ -426,10 +478,11 @@ public class FlumePersistentManager exte
         private final SecretKey secretKey;
         private final int batchSize;
         private final AtomicLong dbCounter;
+        private final int lockTimeoutRetryCount;
 
         public WriterThread(final Database database, final Environment environment,
                             final FlumePersistentManager manager, final Gate gate, final int batchsize,
-                            final SecretKey secretKey, final AtomicLong dbCount) {
+                            final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
             this.database = database;
             this.environment = environment;
             this.manager = manager;
@@ -438,6 +491,7 @@ public class FlumePersistentManager exte
             this.secretKey = secretKey;
             this.setDaemon(true);
             this.dbCounter = dbCount;
+            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
         }
 
         public void shutdown() {
@@ -474,47 +528,88 @@ public class FlumePersistentManager exte
                                 break;
                             }
                         } else {
-                            Transaction txn = environment.beginTransaction(null, null);
-                            Cursor cursor = database.openCursor(txn, null);
-                            try {
-                                status = cursor.getFirst(key, data, LockMode.RMW);
-                                while (status == OperationStatus.SUCCESS) {
-                                    final SimpleEvent event = createEvent(data);
-                                    if (event != null) {
+                            Exception exception = null;
+                            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+                                exception = null;
+                                Transaction txn = null;
+                                Cursor cursor = null;
+                                try {
+                                    txn = environment.beginTransaction(null, null);
+                                    cursor = database.openCursor(txn, null);
+                                    try {
+                                        status = cursor.getFirst(key, data, LockMode.RMW);
+                                        while (status == OperationStatus.SUCCESS) {
+                                            final SimpleEvent event = createEvent(data);
+                                            if (event != null) {
+                                                try {
+                                                    manager.doSend(event);
+                                                } catch (final Exception ioe) {
+                                                    errors = true;
+                                                    LOGGER.error("Error sending event", ioe);
+                                                    break;
+                                                }
+                                                try {
+                                                    cursor.delete();
+                                                } catch (final Exception ex) {
+                                                    LOGGER.error("Unable to delete event", ex);
+                                                }
+                                            }
+                                            status = cursor.getNext(key, data, LockMode.RMW);
+                                        }
+                                        if (cursor != null) {
+                                            cursor.close();
+                                            cursor = null;
+                                        }
+                                        txn.commit();
+                                        txn = null;
+                                        dbCounter.decrementAndGet();
+                                        exception = null;
+                                        break;
+                                    } catch (final LockConflictException lce) {
+                                        exception = lce;
+                                        // Fall through and retry.
+                                    } catch (final Exception ex) {
+                                        LOGGER.error("Error reading or writing to database", ex);
+                                        shutdown = true;
+                                        break;
+                                    } finally {
+                                        if (cursor != null) {
+                                            cursor.close();
+                                            cursor = null;
+                                        }
+                                        if (txn != null) {
+                                            txn.abort();
+                                            txn = null;
+                                        }
+                                    }
+                                } catch (LockConflictException lce) {
+                                    exception = lce;
+                                    if (cursor != null) {
                                         try {
-                                            manager.doSend(event);
-                                        } catch (final Exception ioe) {
-                                            errors = true;
-                                            LOGGER.error("Error sending event", ioe);
-                                            break;
+                                            cursor.close();
+                                            cursor = null;
+                                        } catch (Exception ex) {
+                                            // Ignore exception
                                         }
+                                    }
+                                    if (txn != null) {
                                         try {
-                                            cursor.delete();
-                                        } catch (final Exception ex) {
-                                            LOGGER.error("Unable to delete event", ex);
+                                            txn.abort();
+                                            txn = null;
+                                        } catch (Exception ex) {
+                                            // Ignore exception
                                         }
                                     }
-                                    status = cursor.getNext(key, data, LockMode.RMW);
-                                }
-                                if (cursor != null) {
-                                    cursor.close();
-                                    cursor = null;
                                 }
-                                txn.commit();
-                                txn = null;
-                                dbCounter.decrementAndGet();
-                            } catch (final Exception ex) {
-                                LOGGER.error("Error reading or writing to database", ex);
-                                shutdown = true;
-                                break;
-                            } finally {
-                                if (cursor != null) {
-                                    cursor.close();
-                                }
-                                if (txn != null) {
-                                    txn.abort();
+                                try {
+                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+                                } catch (InterruptedException ie) {
+                                    // Ignore the error
                                 }
                             }
+                            if (exception != null) {
+                                LOGGER.error("Unable to read or update data base", exception);
+                            }
                         }
                         if (errors) {
                             Thread.sleep(manager.delay);
@@ -575,28 +670,89 @@ public class FlumePersistentManager exte
                 if (!errors) {
                     cursor.close();
                     cursor = null;
-                    final Transaction txn = environment.beginTransaction(null, null);
-                    try {
-                        for (final Event event : batch.getEvents()) {
+                    Transaction txn = null;
+                    Exception exception = null;
+                    for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+                        try {
+                            txn = environment.beginTransaction(null, null);
                             try {
-                                final Map<String, String> headers = event.getHeaders();
-                                key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
-                                database.delete(txn, key);
+                                for (final Event event : batch.getEvents()) {
+                                    try {
+                                        final Map<String, String> headers = event.getHeaders();
+                                        key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+                                        database.delete(txn, key);
+                                    } catch (final Exception ex) {
+                                        LOGGER.error("Error deleting key from database", ex);
+                                    }
+                                }
+                                txn.commit();
+                                long count = dbCounter.get();
+                                while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
+                                    count = dbCounter.get();
+                                }
+                                exception = null;
+                                break;
+                            } catch (final LockConflictException lce) {
+                                exception = lce;
+                                if (cursor != null) {
+                                    try {
+                                        cursor.close();
+                                        cursor = null;
+                                    } catch (Exception ex) {
+                                        // Ignore exception
+                                    }
+                                }
+                                if (txn != null) {
+                                    try {
+                                        txn.abort();
+                                        txn = null;
+                                    } catch (Exception ex) {
+                                        // Ignore exception
+                                    }
+                                }
                             } catch (final Exception ex) {
-                                LOGGER.error("Error deleting key from database", ex);
+                                LOGGER.error("Unable to commit transaction", ex);
+                                if (txn != null) {
+                                    txn.abort();
+                                }
+                            }
+                        } catch (LockConflictException lce) {
+                            exception = lce;
+                            if (cursor != null) {
+                                try {
+                                    cursor.close();
+                                    cursor = null;
+                                } catch (Exception ex) {
+                                    // Ignore exception
+                                }
+                            }
+                            if (txn != null) {
+                                try {
+                                    txn.abort();
+                                    txn = null;
+                                } catch (Exception ex) {
+                                    // Ignore exception
+                                }
+                            }
+                        } finally {
+                            if (cursor != null) {
+                                cursor.close();
+                                cursor = null;
+                            }
+                            if (txn != null) {
+                                txn.abort();
+                                txn = null;
                             }
                         }
-                        txn.commit();
-                        long count = dbCounter.get();
-                        while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
-                            count = dbCounter.get();
-                        }
-                    } catch (final Exception ex) {
-                        LOGGER.error("Unable to commit transaction", ex);
-                        if (txn != null) {
-                            txn.abort();
+                        try {
+                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+                        } catch (InterruptedException ie) {
+                            // Ignore the error
                         }
                     }
+                    if (exception != null) {
+                        LOGGER.error("Unable to delete events from data base", exception);
+                    }
                 }
             } catch (final Exception ex) {
                 LOGGER.error("Error reading database", ex);
@@ -607,6 +763,7 @@ public class FlumePersistentManager exte
                     cursor.close();
                 }
             }
+
             return errors;
         }
 

Modified: logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java Tue Sep 10 20:47:28 2013
@@ -134,7 +134,7 @@ public class FlumeAppenderTest {
         final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
                 null, "false", "Avro", null, "1000", "1000", "1", "1000",
                 "avro", "false", null, null, null, null, null, "true", "1",
-                null, null, null);
+                null, null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -163,7 +163,7 @@ public class FlumeAppenderTest {
         final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
                 null, "false", "Avro", null, "1000", "1000", "1", "1000",
                 "avro", "false", null, null, null, "ReqCtx_", null, "true",
-                "1", null, null, null);
+                "1", null, null, null, null);
         avroAppender.start();
         final Logger eventLogger = (Logger) LogManager.getLogger("EventLogger");
         Assert.assertNotNull(eventLogger);
@@ -203,7 +203,7 @@ public class FlumeAppenderTest {
         final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
                 null, "false", "Avro", null, "1000", "1000", "1", "1000",
                 "avro", "false", null, null, null, null, null, "true", "1",
-                null, null, null);
+                null, null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -237,7 +237,7 @@ public class FlumeAppenderTest {
         final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
                 null, "false", "Avro", null, "1000", "1000", "1", "1000",
                 "avro", "false", null, null, null, null, null, "true", "10",
-                null, null, null);
+                null, null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -271,7 +271,7 @@ public class FlumeAppenderTest {
         final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
                 null, "false", "Avro", null, "1000", "1000", "1", "1000",
                 "avro", "false", null, null, null, null, null, "true", "1",
-                null, null, null);
+                null, null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -301,7 +301,7 @@ public class FlumeAppenderTest {
         final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
                 null, "false", "Avro", null, "1000", "1000", "1", "1000",
                 "avro", "false", null, null, null, null, null, "true", "1",
-                null, null, null);
+                null, null, null, null);
         avroAppender.start();
         Assert.assertTrue("Appender Not started", avroAppender.isStarted());
         avroLogger.addAppender(avroAppender);
@@ -349,7 +349,7 @@ public class FlumeAppenderTest {
         final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
                 null, "false", "Avro", null, "1000", "1000", "1", "1000",
                 "avro", "false", null, null, null, null, null, "true", "1",
-                null, null, null);
+                null, null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);

Modified: logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java Tue Sep 10 20:47:28 2013
@@ -233,6 +233,102 @@ public class FlumePersistentAppenderTest
         Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
             body.endsWith("This is a test message"));
     }
+
+    @Test
+    public void testMultipleConcurrent() throws InterruptedException, IOException {
+
+        final int eventsCount = 10000;
+
+        Thread writer1 = new WriterThread(0, eventsCount / 4);
+        Thread writer2 = new WriterThread(eventsCount / 4, eventsCount / 2);
+        Thread writer3 = new WriterThread(eventsCount / 2, (3 * eventsCount) / 4);
+        Thread writer4 = new WriterThread((3 * eventsCount) / 4, eventsCount);
+        writer1.start();
+        writer2.start();
+        writer3.start();
+        writer4.start();
+
+
+        final boolean[] fields = new boolean[eventsCount];
+        Thread reader1 = new ReaderThread(0, eventsCount / 4, fields);
+        Thread reader2 = new ReaderThread(eventsCount / 4, eventsCount / 2, fields);
+        Thread reader3 = new ReaderThread(eventsCount / 2, (eventsCount * 3) / 4, fields);
+        Thread reader4 = new ReaderThread((eventsCount * 3) / 4, eventsCount, fields);
+
+        reader1.start();
+        reader2.start();
+        reader3.start();
+        reader4.start();
+
+        writer1.join();
+        writer2.join();
+        writer3.join();
+        writer4.join();
+        reader1.join();
+        reader2.join();
+        reader3.join();
+        reader4.join();
+
+        for (int i = 0; i < eventsCount; ++i) {
+            Assert.assertTrue(
+                "Channel contained event, but not expected message " + i,
+                fields[i]);
+        }
+    }
+
+    private class WriterThread extends Thread {
+
+        private final int start;
+        private final int stop;
+
+        public WriterThread(int start, int stop) {
+            this.start = start;
+            this.stop = stop;
+        }
+
+        public void run() {
+            for (int i = start; i < stop; ++i) {
+                final StructuredDataMessage msg = new StructuredDataMessage(
+                    "Test", "Test Multiple " + i, "Test");
+                msg.put("counter", Integer.toString(i));
+                EventLogger.logEvent(msg);
+            }
+        }
+    }
+
+    private class ReaderThread extends Thread {
+        private final int start;
+        private final int stop;
+        private final boolean[] fields;
+
+        private ReaderThread(int start, int stop, boolean[] fields) {
+            this.start = start;
+            this.stop = stop;
+            this.fields = fields;
+        }
+        public void run() {
+
+            for (int i = start; i < stop; ++i) {
+                Event event = primary.poll();
+                while (event == null) {
+                    event = primary.poll();
+                }
+
+                Assert.assertNotNull("Received " + i + " events. Event "
+                    + (i + 1) + " is null", event);
+                final String value = event.getHeaders().get("counter");
+                Assert.assertNotNull("Missing counter", value);
+                final int counter = Integer.parseInt(value);
+                if (fields[counter]) {
+                    Assert.fail("Duplicate event");
+                } else {
+                    fields[counter] = true;
+                }
+
+            }
+        }
+    }
+
     /*
     @Test
     public void testPerformance() throws Exception {
@@ -319,7 +415,7 @@ public class FlumePersistentAppenderTest
         public Status appendBatch(final List<AvroFlumeEvent> events) throws AvroRemoteException {
             Preconditions.checkState(eventQueue.addAll(events));
             for (final AvroFlumeEvent event : events) {
-               // System.out.println("Received event " + event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
+                // System.out.println("Received event " + event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
             }
             return Status.OK;
         }

Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Tue Sep 10 20:47:28 2013
@@ -40,6 +40,9 @@
         reversed (previous "true"s should become "false"s, and vice versa). Since this was an undocumented attribute up
         until now, it's unlikely this change will affect any users.
       </action>
+      <action issue="LOG4J2-391" dev="rgoers" type="fix" due-to="Kamal Bahadur">
+        FlumePersistentManager now handles LockConflictExceptions in Berkeley Db.
+      </action>
       <action issue="LOG4J2-338" dev="rgoers" type="add" due-to="Tibor Benke">
         Add TLSAppender. Also added missing license headers to several files.
       </action>

Modified: logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml (original)
+++ logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml Tue Sep 10 20:47:28 2013
@@ -850,6 +850,12 @@
               <td>The Layout to use to format the LogEvent. If no layout is specified RFC5424Layout will be used.</td>
             </tr>
             <tr>
+              <td>lockTimeoutRetries</td>
+              <td>integer</td>
+              <td>The number of times to retry if a LockConflictException occurs while writing to Berkeley DB. The
+                default is 5.</td>
+            </tr>
+            <tr>
               <td>maxDelay</td>
               <td>integer</td>
               <td>The maximum number of seconds to wait for batchSize events before publishing the batch.</td>