You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2012/08/23 07:47:11 UTC

svn commit: r1376372 - in /logging/log4j/log4j2/trunk: flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/ flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/ flume-ng/src/test/resources/ src/site/xdoc/manual/

Author: rgoers
Date: Thu Aug 23 05:47:10 2012
New Revision: 1376372

URL: http://svn.apache.org/viewvc?rev=1376372&view=rev
Log:
Add ability to specify data directory with Agents. Give each source its own channel in the failover tests.

Modified:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
    logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java Thu Aug 23 05:47:10 2012
@@ -126,6 +126,7 @@ public final class FlumeAppender extends
     public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
                                                    @PluginElement("properties") Property[] properties,
                                                    @PluginAttr("embedded") String embedded,
+                                                   @PluginAttr("dataDir") String dataDir,
                                                    @PluginAttr("reconnectionDelay") String delay,
                                                    @PluginAttr("agentRetries") String agentRetries,
                                                    @PluginAttr("name") String name,
@@ -163,7 +164,7 @@ public final class FlumeAppender extends
         FlumeManager manager;
 
         if (embed) {
-            manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount);
+            manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
         } else {
             if (agents == null || agents.length == 0) {
                 LOGGER.debug("No agents provided, using defaults");

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java Thu Aug 23 05:47:10 2012
@@ -41,6 +41,8 @@ public class FlumeEmbeddedManager extend
 
     protected static final String SOURCE_NAME = "log4j-source";
 
+    private static final String LINE_SEP = System.getProperty("file.separator");
+
     private final Log4jEventSource source;
 
     private final String shortName;
@@ -68,7 +70,8 @@ public class FlumeEmbeddedManager extend
      * @param batchSize The number of events to include in a batch.
      * @return A FlumeAvroManager.
      */
-    public static FlumeEmbeddedManager getManager(String name, Agent[] agents, Property[] properties, int batchSize) {
+    public static FlumeEmbeddedManager getManager(String name, Agent[] agents, Property[] properties, int batchSize,
+                                                  String dataDir) {
 
         if (batchSize <= 0) {
             batchSize = 1;
@@ -120,7 +123,7 @@ public class FlumeEmbeddedManager extend
             }
         }
         return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
-            new FactoryData(name, agents, properties, batchSize));
+            new FactoryData(name, agents, properties, batchSize, dataDir));
     }
 
     public void send(FlumeEvent event, int delay, int retries) {
@@ -139,6 +142,7 @@ public class FlumeEmbeddedManager extend
         private Agent[] agents;
         private Property[] properties;
         private int batchSize;
+        private String dataDir;
         private String name;
 
         /**
@@ -147,12 +151,14 @@ public class FlumeEmbeddedManager extend
          * @param agents The agents.
          * @param properties The Flume configuration properties.
          * @param batchSize The number of events to include in a batch.
+         * @param dataDir The directory where Flume should write to.
          */
-        public FactoryData(String name, Agent[] agents, Property[] properties, int batchSize) {
+        public FactoryData(String name, Agent[] agents, Property[] properties, int batchSize, String dataDir) {
             this.name = name;
             this.agents = agents;
             this.batchSize = batchSize;
             this.properties = properties;
+            this.dataDir = dataDir;
         }
     }
 
@@ -171,7 +177,8 @@ public class FlumeEmbeddedManager extend
         public FlumeEmbeddedManager createManager(String name, FactoryData data) {
             try {
                 DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
-                Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize);
+                Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
+                    data.dataDir);
                 FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
                 NodeConfiguration conf = builder.load(data.name, props, nodeManager);
 
@@ -187,7 +194,8 @@ public class FlumeEmbeddedManager extend
             return null;
         }
 
-        private Properties createProperties(String name, Agent[] agents, Property[] properties, int batchSize) {
+        private Properties createProperties(String name, Agent[] agents, Property[] properties, int batchSize,
+                                            String dataDir) {
             Properties props = new Properties();
 
             if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
@@ -205,6 +213,14 @@ public class FlumeEmbeddedManager extend
                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
                 props.put(name + ".channels", "file");
                 props.put(name + ".channels.file.type", "file");
+                if (dataDir != null && dataDir.length() > 0) {
+                    if (!dataDir.endsWith(LINE_SEP)) {
+                        dataDir = dataDir + LINE_SEP;
+                    }
+
+                    props.put(name + ".channels.file.checkpointDir", dataDir + "checkpoint");
+                    props.put(name + ".channels.file.dataDirs", dataDir + "data");
+                }
 
                 StringBuilder sb = new StringBuilder();
                 String leading = "";

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java Thu Aug 23 05:47:10 2012
@@ -51,7 +51,7 @@ public class FlumeEvent extends SimpleEv
 
     private static final String GUID = "guId";
 
-    private static final String TIMESTAMP = "timestamp";;
+    private static final String TIMESTAMP = "timeStamp";;
 
     private final LogEvent event;
 

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java Thu Aug 23 05:47:10 2012
@@ -60,5 +60,7 @@ public class Log4jEventSource extends Ab
             logger.warn("Unabled to process event {}" + event, ex);
             throw ex;
         }
+        sourceCounter.incrementAppendAcceptedCount();
+        sourceCounter.incrementEventAcceptedCount();
     }
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java Thu Aug 23 05:47:10 2012
@@ -123,8 +123,8 @@ public class FlumeAppenderTest {
     @Test
     public void testLog4jAvroAppender() throws InterruptedException, IOException {
         Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
-            null, null, null, null, null, "true", "1", null, null, null);
+        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+            "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -150,8 +150,8 @@ public class FlumeAppenderTest {
     @Test
     public void testMultiple() throws InterruptedException, IOException {
         Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
-            null, null, null, null, null, "true", "1", null, null, null);
+        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+            "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -180,8 +180,8 @@ public class FlumeAppenderTest {
      @Test
     public void testBatch() throws InterruptedException, IOException {
         Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
-            null, null, null, null, null, "true", "10", null, null, null);
+        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+            "false", null, null, null, null, null, "true", "10", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -211,8 +211,8 @@ public class FlumeAppenderTest {
     @Test
     public void testConnectionRefused() {
         Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
-            null, null, null, null, null, "true", "1", null, null, null);
+        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+            "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -238,8 +238,8 @@ public class FlumeAppenderTest {
         String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
         Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
                                       Agent.createAgent("localhost", altPort)};
-        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
-            null, null, null, null, null, "true", "1", null, null, null);
+        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+            "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java Thu Aug 23 05:47:10 2012
@@ -46,6 +46,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.management.ManagementFactory;
@@ -65,7 +66,8 @@ public class FlumeEmbeddedAgentTest {
 
     private AvroSource primarySource;
     private AvroSource altSource;
-    private Channel channel;
+    private Channel primaryChannel;
+    private Channel alternateChannel;
 
     private String testPort;
     private String altPort;
@@ -82,13 +84,19 @@ public class FlumeEmbeddedAgentTest {
 
     @Before
     public void setUp() throws Exception {
+        File file = new File("target/file-channel");
+        boolean result = deleteFiles(file);
         primarySource = new AvroSource();
         primarySource.setName("Primary");
         altSource = new AvroSource();
         altSource.setName("Alternate");
-        channel = new MemoryChannel();
+        primaryChannel = new MemoryChannel();
+        primaryChannel.setName("Primary Memory");
+        alternateChannel = new MemoryChannel();
+        alternateChannel.setName("Alternate Memory");
 
-        Configurables.configure(channel, new Context());
+        Configurables.configure(primaryChannel, new Context());
+        Configurables.configure(alternateChannel, new Context());
 
         /*
         * Clear out all other appenders associated with this logger to ensure we're
@@ -107,18 +115,24 @@ public class FlumeEmbeddedAgentTest {
         Configurables.configure(altSource, context);
 
         List<Channel> channels = new ArrayList<Channel>();
-        channels.add(channel);
+        channels.add(primaryChannel);
 
-        ChannelSelector cs = new ReplicatingChannelSelector();
-        cs.setChannels(channels);
+        ChannelSelector primaryCS = new ReplicatingChannelSelector();
+        primaryCS.setChannels(channels);
 
-        primarySource.setChannelProcessor(new ChannelProcessor(cs));
-        altSource.setChannelProcessor(new ChannelProcessor(cs));
+        List<Channel> altChannels = new ArrayList<Channel>();
+        altChannels.add(alternateChannel);
+
+        ChannelSelector alternateCS = new ReplicatingChannelSelector();
+        alternateCS.setChannels(altChannels);
+
+        primarySource.setChannelProcessor(new ChannelProcessor(primaryCS));
+        altSource.setChannelProcessor(new ChannelProcessor(alternateCS));
 
         primarySource.start();
         altSource.start();
 
-    	  Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+        Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
             primarySource, LifecycleState.START_OR_ERROR));
         Assert.assertEquals("Server is started", LifecycleState.START, primarySource.getLifecycleState());
         System.setProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG);
@@ -132,10 +146,12 @@ public class FlumeEmbeddedAgentTest {
         ctx.reconfigure();
         primarySource.stop();
         altSource.stop();
-	      Assert.assertTrue("Reached stop or error",
-	           LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
-	      Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        Assert.assertTrue("Reached stop or error",
+            LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
+        Assert.assertEquals("Server is stopped", LifecycleState.STOP,
             primarySource.getLifecycleState());
+        File file = new File("target/file-channel");
+        boolean result = deleteFiles(file);
         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
         Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
         for (ObjectName name : names) {
@@ -153,18 +169,18 @@ public class FlumeEmbeddedAgentTest {
         StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test");
         EventLogger.logEvent(msg);
 
-        Transaction transaction = channel.getTransaction();
+        Transaction transaction = primaryChannel.getTransaction();
         transaction.begin();
 
-        Event event = channel.take();
-   	    Assert.assertNotNull(event);
+        Event event = primaryChannel.take();
+        Assert.assertNotNull(event);
         String body = getBody(event);
-  	    Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+        Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
             body.endsWith("Test Log4j"));
-	      transaction.commit();
-	      transaction.close();
+        transaction.commit();
+        transaction.close();
 
-	      primarySource.stop();
+        primarySource.stop();
     }
 
     @Test
@@ -175,15 +191,15 @@ public class FlumeEmbeddedAgentTest {
             EventLogger.logEvent(msg);
         }
         for (int i = 0; i < 10; ++i) {
-            Transaction transaction = channel.getTransaction();
+            Transaction transaction = primaryChannel.getTransaction();
             transaction.begin();
 
-            Event event = channel.take();
+            Event event = primaryChannel.take();
             Assert.assertNotNull(event);
             String body = getBody(event);
             String expected = "Test Multiple " + i;
-            Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
-                " Received: " + body, body.endsWith(expected));
+            Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+                body.endsWith(expected));
             transaction.commit();
             transaction.close();
         }
@@ -191,6 +207,7 @@ public class FlumeEmbeddedAgentTest {
         primarySource.stop();
     }
 
+
     @Test
     public void testFailover() throws InterruptedException, IOException {
         Logger logger = LogManager.getLogger("testFailover");
@@ -200,15 +217,15 @@ public class FlumeEmbeddedAgentTest {
             EventLogger.logEvent(msg);
         }
         for (int i = 0; i < 10; ++i) {
-            Transaction transaction = channel.getTransaction();
+            Transaction transaction = primaryChannel.getTransaction();
             transaction.begin();
 
-            Event event = channel.take();
+            Event event = primaryChannel.take();
             Assert.assertNotNull(event);
             String body = getBody(event);
             String expected = "Test Primary " + i;
-            Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
-                " Received: " + body, body.endsWith(expected));
+            Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+                body.endsWith(expected));
             transaction.commit();
             transaction.close();
         }
@@ -221,17 +238,17 @@ public class FlumeEmbeddedAgentTest {
             EventLogger.logEvent(msg);
         }
         for (int i = 0; i < 10; ++i) {
-            Transaction transaction = channel.getTransaction();
+            Transaction transaction = alternateChannel.getTransaction();
             transaction.begin();
 
-            Event event = channel.take();
+            Event event = alternateChannel.take();
             Assert.assertNotNull(event);
             String body = getBody(event);
             String expected = "Test Alternate " + i;
             /* When running in Gump Flume consistently returns the last event from the primary channel after
-               the failover, which fails this test
+               the failover, which fails this test */
             Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
-                " Received: " + body, body.endsWith(expected)); */
+                " Received: " + body, body.endsWith(expected));
             transaction.commit();
             transaction.close();
         }
@@ -240,12 +257,28 @@ public class FlumeEmbeddedAgentTest {
 
     private String getBody(Event event) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
-            int n = 0;
-            while (-1 != (n = is.read())) {
-                baos.write(n);
+        InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+        int n = 0;
+        while (-1 != (n = is.read())) {
+            baos.write(n);
+        }
+        return new String(baos.toByteArray());
+
+    }
+
+    private boolean deleteFiles(File file) {
+        boolean result = true;
+        if (file.isDirectory()) {
+
+            File[] files = file.listFiles();
+            for (File child : files) {
+                result &= deleteFiles(child);
             }
-            return new String(baos.toByteArray());
 
+        } else if (!file.exists()) {
+            return false;
+        }
+
+        return result &= file.delete();
     }
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java Thu Aug 23 05:47:10 2012
@@ -46,6 +46,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.management.ManagementFactory;
@@ -65,11 +66,11 @@ public class FlumeEmbeddedAppenderTest {
 
     private AvroSource primarySource;
     private AvroSource altSource;
-    private Channel channel;
+    private Channel primaryChannel;
+    private Channel alternateChannel;
 
     private String testPort;
     private String altPort;
-    private int counter;
 
     @BeforeClass
     public static void setupClass() {
@@ -83,15 +84,19 @@ public class FlumeEmbeddedAppenderTest {
 
     @Before
     public void setUp() throws Exception {
+        File file = new File("target/file-channel");
+        boolean result = deleteFiles(file);
         primarySource = new AvroSource();
         primarySource.setName("Primary");
         altSource = new AvroSource();
         altSource.setName("Alternate");
-        channel = new MemoryChannel();
-        channel.setName("Memory");
-        ++counter;
+        primaryChannel = new MemoryChannel();
+        primaryChannel.setName("Primary Memory");
+        alternateChannel = new MemoryChannel();
+        alternateChannel.setName("Alternate Memory");
 
-        Configurables.configure(channel, new Context());
+        Configurables.configure(primaryChannel, new Context());
+        Configurables.configure(alternateChannel, new Context());
 
         /*
         * Clear out all other appenders associated with this logger to ensure we're
@@ -110,13 +115,19 @@ public class FlumeEmbeddedAppenderTest {
         Configurables.configure(altSource, context);
 
         List<Channel> channels = new ArrayList<Channel>();
-        channels.add(channel);
+        channels.add(primaryChannel);
 
-        ChannelSelector cs = new ReplicatingChannelSelector();
-        cs.setChannels(channels);
+        ChannelSelector primaryCS = new ReplicatingChannelSelector();
+        primaryCS.setChannels(channels);
 
-        primarySource.setChannelProcessor(new ChannelProcessor(cs));
-        altSource.setChannelProcessor(new ChannelProcessor(cs));
+        List<Channel> altChannels = new ArrayList<Channel>();
+        altChannels.add(alternateChannel);
+
+        ChannelSelector alternateCS = new ReplicatingChannelSelector();
+        alternateCS.setChannels(altChannels);
+
+        primarySource.setChannelProcessor(new ChannelProcessor(primaryCS));
+        altSource.setChannelProcessor(new ChannelProcessor(alternateCS));
 
         primarySource.start();
         altSource.start();
@@ -139,6 +150,8 @@ public class FlumeEmbeddedAppenderTest {
 	           LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
 	      Assert.assertEquals("Server is stopped", LifecycleState.STOP,
             primarySource.getLifecycleState());
+        File file = new File("target/file-channel");
+        boolean result = deleteFiles(file);
         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
         Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
         for (ObjectName name : names) {
@@ -156,10 +169,10 @@ public class FlumeEmbeddedAppenderTest {
         StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test");
         EventLogger.logEvent(msg);
 
-        Transaction transaction = channel.getTransaction();
+        Transaction transaction = primaryChannel.getTransaction();
         transaction.begin();
 
-        Event event = channel.take();
+        Event event = primaryChannel.take();
    	    Assert.assertNotNull(event);
         String body = getBody(event);
   	    Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
@@ -178,10 +191,10 @@ public class FlumeEmbeddedAppenderTest {
             EventLogger.logEvent(msg);
         }
         for (int i = 0; i < 10; ++i) {
-            Transaction transaction = channel.getTransaction();
+            Transaction transaction = primaryChannel.getTransaction();
             transaction.begin();
 
-            Event event = channel.take();
+            Event event = primaryChannel.take();
             Assert.assertNotNull(event);
             String body = getBody(event);
             String expected = "Test Multiple " + i;
@@ -204,10 +217,10 @@ public class FlumeEmbeddedAppenderTest {
             EventLogger.logEvent(msg);
         }
         for (int i = 0; i < 10; ++i) {
-            Transaction transaction = channel.getTransaction();
+            Transaction transaction = primaryChannel.getTransaction();
             transaction.begin();
 
-            Event event = channel.take();
+            Event event = primaryChannel.take();
             Assert.assertNotNull(event);
             String body = getBody(event);
             String expected = "Test Primary " + i;
@@ -225,17 +238,17 @@ public class FlumeEmbeddedAppenderTest {
             EventLogger.logEvent(msg);
         }
         for (int i = 0; i < 10; ++i) {
-            Transaction transaction = channel.getTransaction();
+            Transaction transaction = alternateChannel.getTransaction();
             transaction.begin();
 
-            Event event = channel.take();
+            Event event = alternateChannel.take();
             Assert.assertNotNull(event);
             String body = getBody(event);
             String expected = "Test Alternate " + i;
             /* When running in Gump Flume consistently returns the last event from the primary channel after
-               the failover, which fails this test
+               the failover, which fails this test */
             Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
-                " Received: " + body, body.endsWith(expected)); */
+                " Received: " + body, body.endsWith(expected));
             transaction.commit();
             transaction.close();
         }
@@ -252,4 +265,20 @@ public class FlumeEmbeddedAppenderTest {
             return new String(baos.toByteArray());
 
     }
+
+    private boolean deleteFiles(File file) {
+        boolean result = true;
+        if (file.isDirectory()) {
+
+            File[] files = file.listFiles();
+            for (File child : files) {
+                result &= deleteFiles(child);
+            }
+
+        } else if (!file.exists()) {
+            return false;
+        }
+
+        return result &= file.delete();
+    }
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml Thu Aug 23 05:47:10 2012
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration status="warn" name="MyApp" packages="">
   <appenders>
-    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true">
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true" dataDir="target/file-channel">
       <Agent host="localhost" port="12345"/>
       <Agent host="localhost" port="12346"/>
       <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>

Modified: logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml?rev=1376372&r1=1376371&r2=1376372&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml (original)
+++ logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml Thu Aug 23 05:47:10 2012
@@ -333,6 +333,12 @@
               <td>When set to true the message body will be compressed using gzip</td>
             </tr>
             <tr>
+              <td>dataDir</td>
+              <td>String</td>
+              <td>Directory where the Flume write ahead log should be written. Valid only when embedded is set
+                to true and Agent elements are used instead of Property elements.</td>
+            </tr>
+            <tr>
               <td>embedded</td>
               <td>boolean</td>
               <td>When set to true the embedded Flume agent will be used. When Agent elements are used the events