You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2012/08/23 07:47:11 UTC
svn commit: r1376372 - in /logging/log4j/log4j2/trunk:
flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/
flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/
flume-ng/src/test/resources/ src/site/xdoc/manual/
Author: rgoers
Date: Thu Aug 23 05:47:10 2012
New Revision: 1376372
URL: http://svn.apache.org/viewvc?rev=1376372&view=rev
Log:
Add ability to specify data directory with Agents. Give each source its own channel in the failover tests.
Modified:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java Thu Aug 23 05:47:10 2012
@@ -126,6 +126,7 @@ public final class FlumeAppender extends
public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
@PluginElement("properties") Property[] properties,
@PluginAttr("embedded") String embedded,
+ @PluginAttr("dataDir") String dataDir,
@PluginAttr("reconnectionDelay") String delay,
@PluginAttr("agentRetries") String agentRetries,
@PluginAttr("name") String name,
@@ -163,7 +164,7 @@ public final class FlumeAppender extends
FlumeManager manager;
if (embed) {
- manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount);
+ manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
} else {
if (agents == null || agents.length == 0) {
LOGGER.debug("No agents provided, using defaults");
Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java Thu Aug 23 05:47:10 2012
@@ -41,6 +41,8 @@ public class FlumeEmbeddedManager extend
protected static final String SOURCE_NAME = "log4j-source";
+ private static final String LINE_SEP = System.getProperty("file.separator");
+
private final Log4jEventSource source;
private final String shortName;
@@ -68,7 +70,8 @@ public class FlumeEmbeddedManager extend
* @param batchSize The number of events to include in a batch.
* @return A FlumeAvroManager.
*/
- public static FlumeEmbeddedManager getManager(String name, Agent[] agents, Property[] properties, int batchSize) {
+ public static FlumeEmbeddedManager getManager(String name, Agent[] agents, Property[] properties, int batchSize,
+ String dataDir) {
if (batchSize <= 0) {
batchSize = 1;
@@ -120,7 +123,7 @@ public class FlumeEmbeddedManager extend
}
}
return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
- new FactoryData(name, agents, properties, batchSize));
+ new FactoryData(name, agents, properties, batchSize, dataDir));
}
public void send(FlumeEvent event, int delay, int retries) {
@@ -139,6 +142,7 @@ public class FlumeEmbeddedManager extend
private Agent[] agents;
private Property[] properties;
private int batchSize;
+ private String dataDir;
private String name;
/**
@@ -147,12 +151,14 @@ public class FlumeEmbeddedManager extend
* @param agents The agents.
* @param properties The Flume configuration properties.
* @param batchSize The number of events to include in a batch.
+ * @param dataDir The directory where Flume should write to.
*/
- public FactoryData(String name, Agent[] agents, Property[] properties, int batchSize) {
+ public FactoryData(String name, Agent[] agents, Property[] properties, int batchSize, String dataDir) {
this.name = name;
this.agents = agents;
this.batchSize = batchSize;
this.properties = properties;
+ this.dataDir = dataDir;
}
}
@@ -171,7 +177,8 @@ public class FlumeEmbeddedManager extend
public FlumeEmbeddedManager createManager(String name, FactoryData data) {
try {
DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
- Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize);
+ Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
+ data.dataDir);
FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
NodeConfiguration conf = builder.load(data.name, props, nodeManager);
@@ -187,7 +194,8 @@ public class FlumeEmbeddedManager extend
return null;
}
- private Properties createProperties(String name, Agent[] agents, Property[] properties, int batchSize) {
+ private Properties createProperties(String name, Agent[] agents, Property[] properties, int batchSize,
+ String dataDir) {
Properties props = new Properties();
if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
@@ -205,6 +213,14 @@ public class FlumeEmbeddedManager extend
props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
props.put(name + ".channels", "file");
props.put(name + ".channels.file.type", "file");
+ if (dataDir != null && dataDir.length() > 0) {
+ if (!dataDir.endsWith(LINE_SEP)) {
+ dataDir = dataDir + LINE_SEP;
+ }
+
+ props.put(name + ".channels.file.checkpointDir", dataDir + "checkpoint");
+ props.put(name + ".channels.file.dataDirs", dataDir + "data");
+ }
StringBuilder sb = new StringBuilder();
String leading = "";
Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java Thu Aug 23 05:47:10 2012
@@ -51,7 +51,7 @@ public class FlumeEvent extends SimpleEv
private static final String GUID = "guId";
- private static final String TIMESTAMP = "timestamp";;
+ private static final String TIMESTAMP = "timeStamp";;
private final LogEvent event;
Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java Thu Aug 23 05:47:10 2012
@@ -60,5 +60,7 @@ public class Log4jEventSource extends Ab
logger.warn("Unabled to process event {}" + event, ex);
throw ex;
}
+ sourceCounter.incrementAppendAcceptedCount();
+ sourceCounter.incrementEventAcceptedCount();
}
}
Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java Thu Aug 23 05:47:10 2012
@@ -123,8 +123,8 @@ public class FlumeAppenderTest {
@Test
public void testLog4jAvroAppender() throws InterruptedException, IOException {
Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
- FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
- null, null, null, null, null, "true", "1", null, null, null);
+ FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+ "false", null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -150,8 +150,8 @@ public class FlumeAppenderTest {
@Test
public void testMultiple() throws InterruptedException, IOException {
Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
- FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
- null, null, null, null, null, "true", "1", null, null, null);
+ FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+ "false", null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -180,8 +180,8 @@ public class FlumeAppenderTest {
@Test
public void testBatch() throws InterruptedException, IOException {
Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
- FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
- null, null, null, null, null, "true", "10", null, null, null);
+ FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+ "false", null, null, null, null, null, "true", "10", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -211,8 +211,8 @@ public class FlumeAppenderTest {
@Test
public void testConnectionRefused() {
Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
- FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
- null, null, null, null, null, "true", "1", null, null, null);
+ FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+ "false", null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -238,8 +238,8 @@ public class FlumeAppenderTest {
String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
Agent.createAgent("localhost", altPort)};
- FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
- null, null, null, null, null, "true", "1", null, null, null);
+ FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+ "false", null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java Thu Aug 23 05:47:10 2012
@@ -46,6 +46,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
@@ -65,7 +66,8 @@ public class FlumeEmbeddedAgentTest {
private AvroSource primarySource;
private AvroSource altSource;
- private Channel channel;
+ private Channel primaryChannel;
+ private Channel alternateChannel;
private String testPort;
private String altPort;
@@ -82,13 +84,19 @@ public class FlumeEmbeddedAgentTest {
@Before
public void setUp() throws Exception {
+ File file = new File("target/file-channel");
+ boolean result = deleteFiles(file);
primarySource = new AvroSource();
primarySource.setName("Primary");
altSource = new AvroSource();
altSource.setName("Alternate");
- channel = new MemoryChannel();
+ primaryChannel = new MemoryChannel();
+ primaryChannel.setName("Primary Memory");
+ alternateChannel = new MemoryChannel();
+ alternateChannel.setName("Alternate Memory");
- Configurables.configure(channel, new Context());
+ Configurables.configure(primaryChannel, new Context());
+ Configurables.configure(alternateChannel, new Context());
/*
* Clear out all other appenders associated with this logger to ensure we're
@@ -107,18 +115,24 @@ public class FlumeEmbeddedAgentTest {
Configurables.configure(altSource, context);
List<Channel> channels = new ArrayList<Channel>();
- channels.add(channel);
+ channels.add(primaryChannel);
- ChannelSelector cs = new ReplicatingChannelSelector();
- cs.setChannels(channels);
+ ChannelSelector primaryCS = new ReplicatingChannelSelector();
+ primaryCS.setChannels(channels);
- primarySource.setChannelProcessor(new ChannelProcessor(cs));
- altSource.setChannelProcessor(new ChannelProcessor(cs));
+ List<Channel> altChannels = new ArrayList<Channel>();
+ altChannels.add(alternateChannel);
+
+ ChannelSelector alternateCS = new ReplicatingChannelSelector();
+ alternateCS.setChannels(altChannels);
+
+ primarySource.setChannelProcessor(new ChannelProcessor(primaryCS));
+ altSource.setChannelProcessor(new ChannelProcessor(alternateCS));
primarySource.start();
altSource.start();
- Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+ Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
primarySource, LifecycleState.START_OR_ERROR));
Assert.assertEquals("Server is started", LifecycleState.START, primarySource.getLifecycleState());
System.setProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG);
@@ -132,10 +146,12 @@ public class FlumeEmbeddedAgentTest {
ctx.reconfigure();
primarySource.stop();
altSource.stop();
- Assert.assertTrue("Reached stop or error",
- LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
- Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+ Assert.assertTrue("Reached stop or error",
+ LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
+ Assert.assertEquals("Server is stopped", LifecycleState.STOP,
primarySource.getLifecycleState());
+ File file = new File("target/file-channel");
+ boolean result = deleteFiles(file);
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
for (ObjectName name : names) {
@@ -153,18 +169,18 @@ public class FlumeEmbeddedAgentTest {
StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test");
EventLogger.logEvent(msg);
- Transaction transaction = channel.getTransaction();
+ Transaction transaction = primaryChannel.getTransaction();
transaction.begin();
- Event event = channel.take();
- Assert.assertNotNull(event);
+ Event event = primaryChannel.take();
+ Assert.assertNotNull(event);
String body = getBody(event);
- Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
body.endsWith("Test Log4j"));
- transaction.commit();
- transaction.close();
+ transaction.commit();
+ transaction.close();
- primarySource.stop();
+ primarySource.stop();
}
@Test
@@ -175,15 +191,15 @@ public class FlumeEmbeddedAgentTest {
EventLogger.logEvent(msg);
}
for (int i = 0; i < 10; ++i) {
- Transaction transaction = channel.getTransaction();
+ Transaction transaction = primaryChannel.getTransaction();
transaction.begin();
- Event event = channel.take();
+ Event event = primaryChannel.take();
Assert.assertNotNull(event);
String body = getBody(event);
String expected = "Test Multiple " + i;
- Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
- " Received: " + body, body.endsWith(expected));
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ body.endsWith(expected));
transaction.commit();
transaction.close();
}
@@ -191,6 +207,7 @@ public class FlumeEmbeddedAgentTest {
primarySource.stop();
}
+
@Test
public void testFailover() throws InterruptedException, IOException {
Logger logger = LogManager.getLogger("testFailover");
@@ -200,15 +217,15 @@ public class FlumeEmbeddedAgentTest {
EventLogger.logEvent(msg);
}
for (int i = 0; i < 10; ++i) {
- Transaction transaction = channel.getTransaction();
+ Transaction transaction = primaryChannel.getTransaction();
transaction.begin();
- Event event = channel.take();
+ Event event = primaryChannel.take();
Assert.assertNotNull(event);
String body = getBody(event);
String expected = "Test Primary " + i;
- Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
- " Received: " + body, body.endsWith(expected));
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ body.endsWith(expected));
transaction.commit();
transaction.close();
}
@@ -221,17 +238,17 @@ public class FlumeEmbeddedAgentTest {
EventLogger.logEvent(msg);
}
for (int i = 0; i < 10; ++i) {
- Transaction transaction = channel.getTransaction();
+ Transaction transaction = alternateChannel.getTransaction();
transaction.begin();
- Event event = channel.take();
+ Event event = alternateChannel.take();
Assert.assertNotNull(event);
String body = getBody(event);
String expected = "Test Alternate " + i;
/* When running in Gump Flume consistently returns the last event from the primary channel after
- the failover, which fails this test
+ the failover, which fails this test */
Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
- " Received: " + body, body.endsWith(expected)); */
+ " Received: " + body, body.endsWith(expected));
transaction.commit();
transaction.close();
}
@@ -240,12 +257,28 @@ public class FlumeEmbeddedAgentTest {
private String getBody(Event event) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
- int n = 0;
- while (-1 != (n = is.read())) {
- baos.write(n);
+ InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+ int n = 0;
+ while (-1 != (n = is.read())) {
+ baos.write(n);
+ }
+ return new String(baos.toByteArray());
+
+ }
+
+ private boolean deleteFiles(File file) {
+ boolean result = true;
+ if (file.isDirectory()) {
+
+ File[] files = file.listFiles();
+ for (File child : files) {
+ result &= deleteFiles(child);
}
- return new String(baos.toByteArray());
+ } else if (!file.exists()) {
+ return false;
+ }
+
+ return result &= file.delete();
}
}
Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java Thu Aug 23 05:47:10 2012
@@ -46,6 +46,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
@@ -65,11 +66,11 @@ public class FlumeEmbeddedAppenderTest {
private AvroSource primarySource;
private AvroSource altSource;
- private Channel channel;
+ private Channel primaryChannel;
+ private Channel alternateChannel;
private String testPort;
private String altPort;
- private int counter;
@BeforeClass
public static void setupClass() {
@@ -83,15 +84,19 @@ public class FlumeEmbeddedAppenderTest {
@Before
public void setUp() throws Exception {
+ File file = new File("target/file-channel");
+ boolean result = deleteFiles(file);
primarySource = new AvroSource();
primarySource.setName("Primary");
altSource = new AvroSource();
altSource.setName("Alternate");
- channel = new MemoryChannel();
- channel.setName("Memory");
- ++counter;
+ primaryChannel = new MemoryChannel();
+ primaryChannel.setName("Primary Memory");
+ alternateChannel = new MemoryChannel();
+ alternateChannel.setName("Alternate Memory");
- Configurables.configure(channel, new Context());
+ Configurables.configure(primaryChannel, new Context());
+ Configurables.configure(alternateChannel, new Context());
/*
* Clear out all other appenders associated with this logger to ensure we're
@@ -110,13 +115,19 @@ public class FlumeEmbeddedAppenderTest {
Configurables.configure(altSource, context);
List<Channel> channels = new ArrayList<Channel>();
- channels.add(channel);
+ channels.add(primaryChannel);
- ChannelSelector cs = new ReplicatingChannelSelector();
- cs.setChannels(channels);
+ ChannelSelector primaryCS = new ReplicatingChannelSelector();
+ primaryCS.setChannels(channels);
- primarySource.setChannelProcessor(new ChannelProcessor(cs));
- altSource.setChannelProcessor(new ChannelProcessor(cs));
+ List<Channel> altChannels = new ArrayList<Channel>();
+ altChannels.add(alternateChannel);
+
+ ChannelSelector alternateCS = new ReplicatingChannelSelector();
+ alternateCS.setChannels(altChannels);
+
+ primarySource.setChannelProcessor(new ChannelProcessor(primaryCS));
+ altSource.setChannelProcessor(new ChannelProcessor(alternateCS));
primarySource.start();
altSource.start();
@@ -139,6 +150,8 @@ public class FlumeEmbeddedAppenderTest {
LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
Assert.assertEquals("Server is stopped", LifecycleState.STOP,
primarySource.getLifecycleState());
+ File file = new File("target/file-channel");
+ boolean result = deleteFiles(file);
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
for (ObjectName name : names) {
@@ -156,10 +169,10 @@ public class FlumeEmbeddedAppenderTest {
StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test");
EventLogger.logEvent(msg);
- Transaction transaction = channel.getTransaction();
+ Transaction transaction = primaryChannel.getTransaction();
transaction.begin();
- Event event = channel.take();
+ Event event = primaryChannel.take();
Assert.assertNotNull(event);
String body = getBody(event);
Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
@@ -178,10 +191,10 @@ public class FlumeEmbeddedAppenderTest {
EventLogger.logEvent(msg);
}
for (int i = 0; i < 10; ++i) {
- Transaction transaction = channel.getTransaction();
+ Transaction transaction = primaryChannel.getTransaction();
transaction.begin();
- Event event = channel.take();
+ Event event = primaryChannel.take();
Assert.assertNotNull(event);
String body = getBody(event);
String expected = "Test Multiple " + i;
@@ -204,10 +217,10 @@ public class FlumeEmbeddedAppenderTest {
EventLogger.logEvent(msg);
}
for (int i = 0; i < 10; ++i) {
- Transaction transaction = channel.getTransaction();
+ Transaction transaction = primaryChannel.getTransaction();
transaction.begin();
- Event event = channel.take();
+ Event event = primaryChannel.take();
Assert.assertNotNull(event);
String body = getBody(event);
String expected = "Test Primary " + i;
@@ -225,17 +238,17 @@ public class FlumeEmbeddedAppenderTest {
EventLogger.logEvent(msg);
}
for (int i = 0; i < 10; ++i) {
- Transaction transaction = channel.getTransaction();
+ Transaction transaction = alternateChannel.getTransaction();
transaction.begin();
- Event event = channel.take();
+ Event event = alternateChannel.take();
Assert.assertNotNull(event);
String body = getBody(event);
String expected = "Test Alternate " + i;
/* When running in Gump Flume consistently returns the last event from the primary channel after
- the failover, which fails this test
+ the failover, which fails this test */
Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
- " Received: " + body, body.endsWith(expected)); */
+ " Received: " + body, body.endsWith(expected));
transaction.commit();
transaction.close();
}
@@ -252,4 +265,20 @@ public class FlumeEmbeddedAppenderTest {
return new String(baos.toByteArray());
}
+
+ private boolean deleteFiles(File file) {
+ boolean result = true;
+ if (file.isDirectory()) {
+
+ File[] files = file.listFiles();
+ for (File child : files) {
+ result &= deleteFiles(child);
+ }
+
+ } else if (!file.exists()) {
+ return false;
+ }
+
+ return result &= file.delete();
+ }
}
Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml Thu Aug 23 05:47:10 2012
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="warn" name="MyApp" packages="">
<appenders>
- <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true">
+ <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true" dataDir="target/file-channel">
<Agent host="localhost" port="12345"/>
<Agent host="localhost" port="12346"/>
<RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
Modified: logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml (original)
+++ logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml Thu Aug 23 05:47:10 2012
@@ -333,6 +333,12 @@
<td>When set to true the message body will be compressed using gzip</td>
</tr>
<tr>
+ <td>dataDir</td>
+ <td>String</td>
+ <td>Directory where the Flume write ahead log should be written. Valid only when embedded is set
+ to true and Agent elements are used instead of Property elements.</td>
+ </tr>
+ <tr>
<td>embedded</td>
<td>boolean</td>
<td>When set to true the embedded Flume agent will be used. When Agent elements are used the events