You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2015/06/15 03:17:37 UTC
logging-log4j2 git commit: LOG4J2-1044 - Support batchSize in
FlumeAvroManager.
Repository: logging-log4j2
Updated Branches:
refs/heads/master 4c6d636b6 -> 3780d00c0
LOG4J2-1044 - Support batchSize in FlumeAvroManager.
Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/3780d00c
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/3780d00c
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/3780d00c
Branch: refs/heads/master
Commit: 3780d00c03770e4357d9def9a0460d6b665c90e3
Parents: 4c6d636
Author: Ralph Goers <rg...@nextiva.com>
Authored: Sun Jun 14 18:17:10 2015 -0700
Committer: Ralph Goers <rg...@nextiva.com>
Committed: Sun Jun 14 18:17:10 2015 -0700
----------------------------------------------------------------------
.../log4j/flume/appender/FlumeAppender.java | 4 +-
.../log4j/flume/appender/FlumeAvroManager.java | 71 ++++++++++++++------
.../flume/appender/FlumePersistentManager.java | 15 ++---
.../log4j/flume/appender/FlumeAppenderTest.java | 41 +++++++++++
src/changes/changes.xml | 3 +
5 files changed, 101 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
index c37ea1d..1c466ef 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
@@ -247,7 +247,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF
LOGGER.debug("No agents provided, using defaults");
agents = new Agent[] {Agent.createAgent(null, null)};
}
- manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis);
+ manager = FlumeAvroManager.getManager(name, agents, batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
break;
case PERSISTENT:
if (agents == null || agents.length == 0) {
@@ -263,7 +263,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF
LOGGER.debug("No agents provided, using defaults");
agents = new Agent[] {Agent.createAgent(null, null)};
}
- manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis);
+ manager = FlumeAvroManager.getManager(name, agents, batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
}
if (manager == null) {
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
index a4d52b6..4f5bd1a 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
@@ -38,6 +38,9 @@ public class FlumeAvroManager extends AbstractFlumeManager {
private final int batchSize;
+ private final long delayNanos;
+ private final int delayMillis;
+
private final int retries;
private final int connectTimeoutMillis;
@@ -48,6 +51,9 @@ public class FlumeAvroManager extends AbstractFlumeManager {
private RpcClient rpcClient = null;
+ private BatchEvent batchEvent = new BatchEvent();
+ private long nextSend = 0;
+
/**
* Constructor
* @param name The unique name of this manager.
@@ -59,10 +65,12 @@ public class FlumeAvroManager extends AbstractFlumeManager {
*
*/
protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
- final int retries, final int connectTimeout, final int requestTimeout) {
+ final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
super(name);
this.agents = agents;
this.batchSize = batchSize;
+ this.delayMillis = delayMillis;
+ this.delayNanos = delayMillis * 1000000;
this.retries = retries;
this.connectTimeoutMillis = connectTimeout;
this.requestTimeoutMillis = requestTimeout;
@@ -74,12 +82,13 @@ public class FlumeAvroManager extends AbstractFlumeManager {
* @param name The name of the manager.
* @param agents The agents to use.
* @param batchSize The number of events to include in a batch.
+ * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
* @param retries The number of times to retry connecting before giving up.
* @param connectTimeoutMillis The connection timeout in ms.
* @param requestTimeoutMillis The request timeout in ms.
* @return A FlumeAvroManager.
*/
- public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
+ public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, int delayMillis,
final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
if (agents == null || agents.length == 0) {
throw new IllegalArgumentException("At least one agent is required");
@@ -100,7 +109,7 @@ public class FlumeAvroManager extends AbstractFlumeManager {
}
sb.append(']');
return getManager(sb.toString(), factory,
- new FactoryData(name, agents, batchSize, retries, connectTimeoutMillis, requestTimeoutMillis));
+ new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
}
/**
@@ -135,6 +144,10 @@ public class FlumeAvroManager extends AbstractFlumeManager {
return batchSize;
}
+ public int getDelayMillis() {
+ return delayMillis;
+ }
+
public synchronized void send(final BatchEvent events) {
if (rpcClient == null) {
rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
@@ -162,26 +175,38 @@ public class FlumeAvroManager extends AbstractFlumeManager {
@Override
public synchronized void send(final Event event) {
- if (rpcClient == null) {
- rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
- }
+ if (batchSize == 1) {
+ if (rpcClient == null) {
+ rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
+ }
- if (rpcClient != null) {
- try {
- rpcClient.append(event);
- } catch (final Exception ex) {
- rpcClient.close();
- rpcClient = null;
+ if (rpcClient != null) {
+ try {
+ rpcClient.append(event);
+ } catch (final Exception ex) {
+ rpcClient.close();
+ rpcClient = null;
+ final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+ agents[current].getPort();
+ LOGGER.warn(msg, ex);
+ throw new AppenderLoggingException("No Flume agents are available");
+ }
+ } else {
final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
- agents[current].getPort();
- LOGGER.warn(msg, ex);
+ agents[current].getPort();
+ LOGGER.warn(msg);
throw new AppenderLoggingException("No Flume agents are available");
}
} else {
- final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
- agents[current].getPort();
- LOGGER.warn(msg);
- throw new AppenderLoggingException("No Flume agents are available");
+ batchEvent.addEvent(event);
+ int count = batchEvent.getEvents().size();
+ if (count == 1) {
+ nextSend = System.nanoTime() + delayNanos;
+ }
+ if (count >= batchSize || System.nanoTime() >= nextSend) {
+ send(batchEvent);
+ batchEvent = new BatchEvent();
+ }
}
}
@@ -248,6 +273,7 @@ public class FlumeAvroManager extends AbstractFlumeManager {
private final String name;
private final Agent[] agents;
private final int batchSize;
+ private final int delayMillis;
private final int retries;
private final int conntectTimeoutMillis;
private final int requestTimeoutMillis;
@@ -258,11 +284,12 @@ public class FlumeAvroManager extends AbstractFlumeManager {
* @param agents The agents.
* @param batchSize The number of events to include in a batch.
*/
- public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
- final int connectTimeoutMillis, final int requestTimeoutMillis) {
+ public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
+ final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
this.name = name;
this.agents = agents;
this.batchSize = batchSize;
+ this.delayMillis = delayMillis;
this.retries = retries;
this.conntectTimeoutMillis = connectTimeoutMillis;
this.requestTimeoutMillis = requestTimeoutMillis;
@@ -284,8 +311,8 @@ public class FlumeAvroManager extends AbstractFlumeManager {
public FlumeAvroManager createManager(final String name, final FactoryData data) {
try {
- return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
- data.conntectTimeoutMillis, data.requestTimeoutMillis);
+ return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
+ data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
} catch (final Exception ex) {
LOGGER.error("Could not create FlumeAvroManager", ex);
}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
index e643d31..c0f8879 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
@@ -90,8 +90,6 @@ public class FlumePersistentManager extends FlumeAvroManager {
private final SecretKey secretKey;
- private final int delayMillis;
-
private final int lockTimeoutRetryCount;
private final ExecutorService threadPool;
@@ -118,8 +116,7 @@ public class FlumePersistentManager extends FlumeAvroManager {
final int requestTimeout, final int delay, final Database database,
final Environment environment, final SecretKey secretKey,
final int lockTimeoutRetryCount) {
- super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
- this.delayMillis = delay;
+ super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
this.database = database;
this.environment = environment;
dbCount.set(database.count());
@@ -515,14 +512,14 @@ public class FlumePersistentManager extends FlumeAvroManager {
@Override
public void run() {
- LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.delayMillis);
- long nextBatchMillis = System.currentTimeMillis() + manager.delayMillis;
+ LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
+ long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
while (!shutdown) {
final long nowMillis = System.currentTimeMillis();
final long dbCount = database.count();
dbCounter.set(dbCount);
if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
- nextBatchMillis = nowMillis + manager.delayMillis;
+ nextBatchMillis = nowMillis + manager.getDelayMillis();
try {
boolean errors = false;
final DatabaseEntry key = new DatabaseEntry();
@@ -621,7 +618,7 @@ public class FlumePersistentManager extends FlumeAvroManager {
}
}
if (errors) {
- Thread.sleep(manager.delayMillis);
+ Thread.sleep(manager.getDelayMillis());
continue;
}
} catch (final Exception ex) {
@@ -629,7 +626,7 @@ public class FlumePersistentManager extends FlumeAvroManager {
}
} else {
if (nextBatchMillis <= nowMillis) {
- nextBatchMillis = nowMillis + manager.delayMillis;
+ nextBatchMillis = nowMillis + manager.getDelayMillis();
}
try {
final long interval = nextBatchMillis - nowMillis;
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
index 7c2e9b9..2715abe 100644
--- a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
+++ b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
@@ -217,6 +217,47 @@ public class FlumeAppenderTest {
eventSource.stop();
}
+
+ @Test
+ public void testIncompleteBatch() throws IOException {
+ final Agent[] agents = new Agent[] { Agent.createAgent("localhost",
+ testPort) };
+ final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
+ null, "false", "Avro", null, "1000", "1000", "1", "500",
+ "avro", "false", null, null, null, null, null, "true", "10",
+ null, null, null, null);
+ avroAppender.start();
+ avroLogger.addAppender(avroAppender);
+ avroLogger.setLevel(Level.ALL);
+
+ Assert.assertNotNull(avroLogger);
+
+ avroLogger.info("Test message 0");
+
+ final Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNull("Received event", event);
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ }
+
+ avroLogger.info("Test message 1");
+ for (int i = 0; i < 2; ++i) {
+ event = channel.take();
+ Assert.assertNotNull("No event for item " + i, event);
+ Assert.assertTrue("Channel contained event, but not expected message",
+ getBody(event).endsWith("Test message " + i));
+ }
+ transaction.commit();
+ transaction.close();
+
+ eventSource.stop();
+ }
+
@Test
public void testBatch() throws IOException {
final Agent[] agents = new Agent[] { Agent.createAgent("localhost",
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/src/changes/changes.xml
----------------------------------------------------------------------
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 849457b..61cd933 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -24,6 +24,9 @@
</properties>
<body>
<release version="2.4" date="2015-MM-DD" description="GA Release 2.4">
+ <action issue="LOG4J2=1044" dev="rgoers" type="update">
+ Support batchSize in FlumeAvroManager.
+ </action>
<action issue="LOG4J2-767" dev="ggregory" type="add" due-to="Mikael Ståldal">
New module for Liquibase integration.
</action>