You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2013/09/10 22:47:29 UTC
svn commit: r1521640 - in /logging/log4j/log4j2/trunk:
log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/
log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/
src/changes/ src/site/xdoc/manual/
Author: rgoers
Date: Tue Sep 10 20:47:28 2013
New Revision: 1521640
URL: http://svn.apache.org/r1521640
Log:
LOG4J2-391 - FlumePersistentManager now handles LockConflictExceptions in Berkeley Db.
Modified:
logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
logging/log4j/log4j2/trunk/src/changes/changes.xml
logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
Modified: logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (original)
+++ logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java Tue Sep 10 20:47:28 2013
@@ -41,6 +41,8 @@ public final class FlumeAppender extends
private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
private static final int DEFAULT_MAX_DELAY = 60000;
+ private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
+
private final AbstractFlumeManager manager;
private final String mdcIncludes;
@@ -149,6 +151,7 @@ public final class FlumeAppender extends
* @param eventPrefix The prefix to add to event key names.
* @param compressBody If true the event body will be compressed.
* @param batchSize Number of events to include in a batch. Defaults to 1.
+ * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB.
* @param factory The factory to use to create Flume events.
* @param layout The layout to format the event.
* @param filter A Filter to filter events.
@@ -157,26 +160,27 @@ public final class FlumeAppender extends
*/
@PluginFactory
public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents,
- @PluginElement("Properties") final Property[] properties,
- @PluginAttribute("embedded") final String embedded,
- @PluginAttribute("type") final String type,
- @PluginAttribute("dataDir") final String dataDir,
- @PluginAttribute("connectTimeout") final String connectionTimeout,
- @PluginAttribute("requestTimeout") final String requestTimeout,
- @PluginAttribute("agentRetries") final String agentRetries,
- @PluginAttribute("maxDelay") final String maxDelay,
- @PluginAttribute("name") final String name,
- @PluginAttribute("ignoreExceptions") final String ignore,
- @PluginAttribute("mdcExcludes") final String excludes,
- @PluginAttribute("mdcIncludes") final String includes,
- @PluginAttribute("mdcRequired") final String required,
- @PluginAttribute("mdcPrefix") final String mdcPrefix,
- @PluginAttribute("eventPrefix") final String eventPrefix,
- @PluginAttribute("compress") final String compressBody,
- @PluginAttribute("batchSize") final String batchSize,
- @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
- @PluginElement("Layout") Layout<? extends Serializable> layout,
- @PluginElement("Filters") final Filter filter) {
+ @PluginElement("Properties") final Property[] properties,
+ @PluginAttribute("embedded") final String embedded,
+ @PluginAttribute("type") final String type,
+ @PluginAttribute("dataDir") final String dataDir,
+ @PluginAttribute("connectTimeout") final String connectionTimeout,
+ @PluginAttribute("requestTimeout") final String requestTimeout,
+ @PluginAttribute("agentRetries") final String agentRetries,
+ @PluginAttribute("maxDelay") final String maxDelay,
+ @PluginAttribute("name") final String name,
+ @PluginAttribute("ignoreExceptions") final String ignore,
+ @PluginAttribute("mdcExcludes") final String excludes,
+ @PluginAttribute("mdcIncludes") final String includes,
+ @PluginAttribute("mdcRequired") final String required,
+ @PluginAttribute("mdcPrefix") final String mdcPrefix,
+ @PluginAttribute("eventPrefix") final String eventPrefix,
+ @PluginAttribute("compress") final String compressBody,
+ @PluginAttribute("batchSize") final String batchSize,
+ @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
+ @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
+ @PluginElement("Layout") Layout<? extends Serializable> layout,
+ @PluginElement("Filters") final Filter filter) {
final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
(agents == null || agents.length == 0) && properties != null && properties.length > 0;
@@ -211,6 +215,7 @@ public final class FlumeAppender extends
final int connectTimeout = Integers.parseInt(connectionTimeout, 0);
final int reqTimeout = Integers.parseInt(requestTimeout, 0);
final int retries = Integers.parseInt(agentRetries, 0);
+ final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY );
if (layout == null) {
@@ -242,7 +247,7 @@ public final class FlumeAppender extends
agents = new Agent[] {Agent.createAgent(null, null)};
}
manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
- connectTimeout, reqTimeout, delay, dataDir);
+ connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir);
break;
default:
LOGGER.debug("No manager type specified. Defaulting to AVRO");
Modified: logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (original)
+++ logging/log4j/log4j2/trunk/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java Tue Sep 10 20:47:28 2013
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.Atomi
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
+import com.sleepycat.je.LockConflictException;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.logging.log4j.LoggingException;
@@ -76,6 +77,8 @@ public class FlumePersistentManager exte
private static final int MILLIS_PER_SECOND = 1000;
+ private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
+
private static BDBManagerFactory factory = new BDBManagerFactory();
private final Database database;
@@ -90,6 +93,8 @@ public class FlumePersistentManager exte
private final int delay;
+ private final int lockTimeoutRetryCount;
+
private final ExecutorService threadPool;
private AtomicLong dbCount = new AtomicLong();
@@ -107,20 +112,24 @@ public class FlumePersistentManager exte
* @param database The database to write to.
* @param environment The database environment.
* @param secretKey The SecretKey to use for encryption.
+ * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
*/
protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
final int batchSize, final int retries, final int connectionTimeout,
final int requestTimeout, final int delay, final Database database,
- final Environment environment, final SecretKey secretKey) {
+ final Environment environment, final SecretKey secretKey,
+ final int lockTimeoutRetryCount) {
super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
this.delay = delay;
this.database = database;
this.environment = environment;
dbCount.set(database.count());
- this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount);
+ this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
+ lockTimeoutRetryCount);
this.worker.start();
this.secretKey = secretKey;
this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
}
@@ -140,7 +149,8 @@ public class FlumePersistentManager exte
public static FlumePersistentManager getManager(final String name, final Agent[] agents,
final Property[] properties, int batchSize, final int retries,
final int connectionTimeout, final int requestTimeout,
- final int delay, final String dataDir) {
+ final int delay, final int lockTimeoutRetryCount,
+ final String dataDir) {
if (agents == null || agents.length == 0) {
throw new IllegalArgumentException("At least one agent is required");
}
@@ -162,7 +172,7 @@ public class FlumePersistentManager exte
sb.append("]");
sb.append(" ").append(dataDirectory);
return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
- connectionTimeout, requestTimeout, delay, dataDir, properties));
+ connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties));
}
@Override
@@ -190,7 +200,7 @@ public class FlumePersistentManager exte
eventData = cipher.doFinal(eventData);
}
final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
- gate, dbCount, getBatchSize()));
+ gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
boolean interrupted = false;
int count = 0;
do {
@@ -258,9 +268,11 @@ public class FlumePersistentManager exte
private final Gate gate;
private final AtomicLong dbCount;
private final long batchSize;
+ private final int lockTimeoutRetryCount;
public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
- final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize) {
+ final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
+ final int lockTimeoutRetryCount) {
this.keyData = keyData;
this.eventData = eventData;
this.environment = environment;
@@ -268,24 +280,61 @@ public class FlumePersistentManager exte
this.gate = gate;
this.dbCount = dbCount;
this.batchSize = batchSize;
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
}
@Override
public Integer call() throws Exception {
final DatabaseEntry key = new DatabaseEntry(keyData);
final DatabaseEntry data = new DatabaseEntry(eventData);
- final Transaction txn = environment.beginTransaction(null, null);
- try {
- database.put(txn, key, data);
- txn.commit();
- if (dbCount.incrementAndGet() >= batchSize) {
- gate.open();
+ Exception exception = null;
+ for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+ Transaction txn = null;
+ try {
+ txn = environment.beginTransaction(null, null);
+ try {
+ database.put(txn, key, data);
+ txn.commit();
+ txn = null;
+ if (dbCount.incrementAndGet() >= batchSize) {
+ gate.open();
+ }
+ exception = null;
+ break;
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ // Fall through and retry.
+ } catch (final Exception ex) {
+ if (txn != null) {
+ txn.abort();
+ }
+ throw ex;
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+ } catch (LockConflictException lce) {
+ exception = lce;
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (Exception ex) {
+ // Ignore exception
+ }
+ }
+
}
- } catch (final Exception ex) {
- if (txn != null) {
- txn.abort();
+ try {
+ Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+ } catch (InterruptedException ie) {
+ // Ignore the error
}
- throw ex;
+ }
+ if (exception != null) {
+ throw exception;
}
return eventData.length;
}
@@ -303,6 +352,7 @@ public class FlumePersistentManager exte
private final int connectionTimeout;
private final int requestTimeout;
private final int delay;
+ private final int lockTimeoutRetryCount;
private final Property[] properties;
/**
@@ -314,7 +364,7 @@ public class FlumePersistentManager exte
*/
public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
final int connectionTimeout, final int requestTimeout, final int delay,
- final String dataDir, final Property[] properties) {
+ final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
this.name = name;
this.agents = agents;
this.batchSize = batchSize;
@@ -323,6 +373,7 @@ public class FlumePersistentManager exte
this.connectionTimeout = connectionTimeout;
this.requestTimeout = requestTimeout;
this.delay = delay;
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
this.properties = properties;
}
}
@@ -410,7 +461,8 @@ public class FlumePersistentManager exte
LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
}
return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
- data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey);
+ data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey,
+ data.lockTimeoutRetryCount);
}
}
@@ -426,10 +478,11 @@ public class FlumePersistentManager exte
private final SecretKey secretKey;
private final int batchSize;
private final AtomicLong dbCounter;
+ private final int lockTimeoutRetryCount;
public WriterThread(final Database database, final Environment environment,
final FlumePersistentManager manager, final Gate gate, final int batchsize,
- final SecretKey secretKey, final AtomicLong dbCount) {
+ final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
this.database = database;
this.environment = environment;
this.manager = manager;
@@ -438,6 +491,7 @@ public class FlumePersistentManager exte
this.secretKey = secretKey;
this.setDaemon(true);
this.dbCounter = dbCount;
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
}
public void shutdown() {
@@ -474,47 +528,88 @@ public class FlumePersistentManager exte
break;
}
} else {
- Transaction txn = environment.beginTransaction(null, null);
- Cursor cursor = database.openCursor(txn, null);
- try {
- status = cursor.getFirst(key, data, LockMode.RMW);
- while (status == OperationStatus.SUCCESS) {
- final SimpleEvent event = createEvent(data);
- if (event != null) {
+ Exception exception = null;
+ for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+ exception = null;
+ Transaction txn = null;
+ Cursor cursor = null;
+ try {
+ txn = environment.beginTransaction(null, null);
+ cursor = database.openCursor(txn, null);
+ try {
+ status = cursor.getFirst(key, data, LockMode.RMW);
+ while (status == OperationStatus.SUCCESS) {
+ final SimpleEvent event = createEvent(data);
+ if (event != null) {
+ try {
+ manager.doSend(event);
+ } catch (final Exception ioe) {
+ errors = true;
+ LOGGER.error("Error sending event", ioe);
+ break;
+ }
+ try {
+ cursor.delete();
+ } catch (final Exception ex) {
+ LOGGER.error("Unable to delete event", ex);
+ }
+ }
+ status = cursor.getNext(key, data, LockMode.RMW);
+ }
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ txn.commit();
+ txn = null;
+ dbCounter.decrementAndGet();
+ exception = null;
+ break;
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ // Fall through and retry.
+ } catch (final Exception ex) {
+ LOGGER.error("Error reading or writing to database", ex);
+ shutdown = true;
+ break;
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+ } catch (LockConflictException lce) {
+ exception = lce;
+ if (cursor != null) {
try {
- manager.doSend(event);
- } catch (final Exception ioe) {
- errors = true;
- LOGGER.error("Error sending event", ioe);
- break;
+ cursor.close();
+ cursor = null;
+ } catch (Exception ex) {
+ // Ignore exception
}
+ }
+ if (txn != null) {
try {
- cursor.delete();
- } catch (final Exception ex) {
- LOGGER.error("Unable to delete event", ex);
+ txn.abort();
+ txn = null;
+ } catch (Exception ex) {
+ // Ignore exception
}
}
- status = cursor.getNext(key, data, LockMode.RMW);
- }
- if (cursor != null) {
- cursor.close();
- cursor = null;
}
- txn.commit();
- txn = null;
- dbCounter.decrementAndGet();
- } catch (final Exception ex) {
- LOGGER.error("Error reading or writing to database", ex);
- shutdown = true;
- break;
- } finally {
- if (cursor != null) {
- cursor.close();
- }
- if (txn != null) {
- txn.abort();
+ try {
+ Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+ } catch (InterruptedException ie) {
+ // Ignore the error
}
}
+ if (exception != null) {
+ LOGGER.error("Unable to read or update data base", exception);
+ }
}
if (errors) {
Thread.sleep(manager.delay);
@@ -575,28 +670,89 @@ public class FlumePersistentManager exte
if (!errors) {
cursor.close();
cursor = null;
- final Transaction txn = environment.beginTransaction(null, null);
- try {
- for (final Event event : batch.getEvents()) {
+ Transaction txn = null;
+ Exception exception = null;
+ for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+ try {
+ txn = environment.beginTransaction(null, null);
try {
- final Map<String, String> headers = event.getHeaders();
- key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
- database.delete(txn, key);
+ for (final Event event : batch.getEvents()) {
+ try {
+ final Map<String, String> headers = event.getHeaders();
+ key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+ database.delete(txn, key);
+ } catch (final Exception ex) {
+ LOGGER.error("Error deleting key from database", ex);
+ }
+ }
+ txn.commit();
+ long count = dbCounter.get();
+ while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
+ count = dbCounter.get();
+ }
+ exception = null;
+ break;
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ if (cursor != null) {
+ try {
+ cursor.close();
+ cursor = null;
+ } catch (Exception ex) {
+ // Ignore exception
+ }
+ }
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (Exception ex) {
+ // Ignore exception
+ }
+ }
} catch (final Exception ex) {
- LOGGER.error("Error deleting key from database", ex);
+ LOGGER.error("Unable to commit transaction", ex);
+ if (txn != null) {
+ txn.abort();
+ }
+ }
+ } catch (LockConflictException lce) {
+ exception = lce;
+ if (cursor != null) {
+ try {
+ cursor.close();
+ cursor = null;
+ } catch (Exception ex) {
+ // Ignore exception
+ }
+ }
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (Exception ex) {
+ // Ignore exception
+ }
+ }
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ if (txn != null) {
+ txn.abort();
+ txn = null;
}
}
- txn.commit();
- long count = dbCounter.get();
- while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
- count = dbCounter.get();
- }
- } catch (final Exception ex) {
- LOGGER.error("Unable to commit transaction", ex);
- if (txn != null) {
- txn.abort();
+ try {
+ Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+ } catch (InterruptedException ie) {
+ // Ignore the error
}
}
+ if (exception != null) {
+ LOGGER.error("Unable to delete events from data base", exception);
+ }
}
} catch (final Exception ex) {
LOGGER.error("Error reading database", ex);
@@ -607,6 +763,7 @@ public class FlumePersistentManager exte
cursor.close();
}
}
+
return errors;
}
Modified: logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java Tue Sep 10 20:47:28 2013
@@ -134,7 +134,7 @@ public class FlumeAppenderTest {
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
null, "false", "Avro", null, "1000", "1000", "1", "1000",
"avro", "false", null, null, null, null, null, "true", "1",
- null, null, null);
+ null, null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -163,7 +163,7 @@ public class FlumeAppenderTest {
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
null, "false", "Avro", null, "1000", "1000", "1", "1000",
"avro", "false", null, null, null, "ReqCtx_", null, "true",
- "1", null, null, null);
+ "1", null, null, null, null);
avroAppender.start();
final Logger eventLogger = (Logger) LogManager.getLogger("EventLogger");
Assert.assertNotNull(eventLogger);
@@ -203,7 +203,7 @@ public class FlumeAppenderTest {
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
null, "false", "Avro", null, "1000", "1000", "1", "1000",
"avro", "false", null, null, null, null, null, "true", "1",
- null, null, null);
+ null, null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -237,7 +237,7 @@ public class FlumeAppenderTest {
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
null, "false", "Avro", null, "1000", "1000", "1", "1000",
"avro", "false", null, null, null, null, null, "true", "10",
- null, null, null);
+ null, null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -271,7 +271,7 @@ public class FlumeAppenderTest {
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
null, "false", "Avro", null, "1000", "1000", "1", "1000",
"avro", "false", null, null, null, null, null, "true", "1",
- null, null, null);
+ null, null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -301,7 +301,7 @@ public class FlumeAppenderTest {
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
null, "false", "Avro", null, "1000", "1000", "1", "1000",
"avro", "false", null, null, null, null, null, "true", "1",
- null, null, null);
+ null, null, null, null);
avroAppender.start();
Assert.assertTrue("Appender Not started", avroAppender.isStarted());
avroLogger.addAppender(avroAppender);
@@ -349,7 +349,7 @@ public class FlumeAppenderTest {
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
null, "false", "Avro", null, "1000", "1000", "1", "1000",
"avro", "false", null, null, null, null, null, "true", "1",
- null, null, null);
+ null, null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
Modified: logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java Tue Sep 10 20:47:28 2013
@@ -233,6 +233,102 @@ public class FlumePersistentAppenderTest
Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
body.endsWith("This is a test message"));
}
+
+ @Test
+ public void testMultipleConcurrent() throws InterruptedException, IOException {
+
+ final int eventsCount = 10000;
+
+ Thread writer1 = new WriterThread(0, eventsCount / 4);
+ Thread writer2 = new WriterThread(eventsCount / 4, eventsCount / 2);
+ Thread writer3 = new WriterThread(eventsCount / 2, (3 * eventsCount) / 4);
+ Thread writer4 = new WriterThread((3 * eventsCount) / 4, eventsCount);
+ writer1.start();
+ writer2.start();
+ writer3.start();
+ writer4.start();
+
+
+ final boolean[] fields = new boolean[eventsCount];
+ Thread reader1 = new ReaderThread(0, eventsCount / 4, fields);
+ Thread reader2 = new ReaderThread(eventsCount / 4, eventsCount / 2, fields);
+ Thread reader3 = new ReaderThread(eventsCount / 2, (eventsCount * 3) / 4, fields);
+ Thread reader4 = new ReaderThread((eventsCount * 3) / 4, eventsCount, fields);
+
+ reader1.start();
+ reader2.start();
+ reader3.start();
+ reader4.start();
+
+ writer1.join();
+ writer2.join();
+ writer3.join();
+ writer4.join();
+ reader1.join();
+ reader2.join();
+ reader3.join();
+ reader4.join();
+
+ for (int i = 0; i < eventsCount; ++i) {
+ Assert.assertTrue(
+ "Channel contained event, but not expected message " + i,
+ fields[i]);
+ }
+ }
+
+ private class WriterThread extends Thread {
+
+ private final int start;
+ private final int stop;
+
+ public WriterThread(int start, int stop) {
+ this.start = start;
+ this.stop = stop;
+ }
+
+ public void run() {
+ for (int i = start; i < stop; ++i) {
+ final StructuredDataMessage msg = new StructuredDataMessage(
+ "Test", "Test Multiple " + i, "Test");
+ msg.put("counter", Integer.toString(i));
+ EventLogger.logEvent(msg);
+ }
+ }
+ }
+
+ private class ReaderThread extends Thread {
+ private final int start;
+ private final int stop;
+ private final boolean[] fields;
+
+ private ReaderThread(int start, int stop, boolean[] fields) {
+ this.start = start;
+ this.stop = stop;
+ this.fields = fields;
+ }
+ public void run() {
+
+ for (int i = start; i < stop; ++i) {
+ Event event = primary.poll();
+ while (event == null) {
+ event = primary.poll();
+ }
+
+ Assert.assertNotNull("Received " + i + " events. Event "
+ + (i + 1) + " is null", event);
+ final String value = event.getHeaders().get("counter");
+ Assert.assertNotNull("Missing counter", value);
+ final int counter = Integer.parseInt(value);
+ if (fields[counter]) {
+ Assert.fail("Duplicate event");
+ } else {
+ fields[counter] = true;
+ }
+
+ }
+ }
+ }
+
/*
@Test
public void testPerformance() throws Exception {
@@ -319,7 +415,7 @@ public class FlumePersistentAppenderTest
public Status appendBatch(final List<AvroFlumeEvent> events) throws AvroRemoteException {
Preconditions.checkState(eventQueue.addAll(events));
for (final AvroFlumeEvent event : events) {
- // System.out.println("Received event " + event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
+ // System.out.println("Received event " + event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
}
return Status.OK;
}
Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Tue Sep 10 20:47:28 2013
@@ -40,6 +40,9 @@
reversed (previous "true"s should become "false"s, and vice versa). Since this was an undocumented attribute up
until now, it's unlikely this change will affect any users.
</action>
+ <action issue="LOG4J2-391" dev="rgoers" type="fix" due-to="Kamal Bahadur">
+ FlumePersistentManager now handles LockConflictExceptions in Berkeley Db.
+ </action>
<action issue="LOG4J2-338" dev="rgoers" type="add" due-to="Tibor Benke">
Add TLSAppender. Also added missing license headers to several files.
</action>
Modified: logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml?rev=1521640&r1=1521639&r2=1521640&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml (original)
+++ logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml Tue Sep 10 20:47:28 2013
@@ -850,6 +850,12 @@
<td>The Layout to use to format the LogEvent. If no layout is specified RFC5424Layout will be used.</td>
</tr>
<tr>
+ <td>lockTimeoutRetries</td>
+ <td>integer</td>
+ <td>The number of times to retry if a LockConflictException occurs while writing to Berkeley DB. The
+ default is 5.</td>
+ </tr>
+ <tr>
<td>maxDelay</td>
<td>integer</td>
<td>The maximum number of seconds to wait for batchSize events before publishing the batch.</td>