You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2012/08/19 09:37:51 UTC

svn commit: r1374701 - in /logging/log4j/log4j2/trunk: ./ core/src/main/java/org/apache/logging/log4j/core/ core/src/main/java/org/apache/logging/log4j/core/config/plugins/ core/src/main/java/org/apache/logging/log4j/core/impl/ core/src/main/java/org/a...

Author: rgoers
Date: Sun Aug 19 07:37:50 2012
New Revision: 1374701

URL: http://svn.apache.org/viewvc?rev=1374701&view=rev
Log:
Fix FLUME-69, Allow Flume agent to be embedded into the Flume appender.

Added:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
      - copied, changed from r1371545, logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeNode.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
      - copied, changed from r1371545, logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/resources/
    logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
    logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml
Removed:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java
Modified:
    logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/LoggerContext.java
    logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/config/plugins/PluginManager.java
    logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/impl/Log4jContextFactory.java
    logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/layout/RFC5424Layout.java
    logging/log4j/log4j2/trunk/flume-ng/pom.xml
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
    logging/log4j/log4j2/trunk/pom.xml
    logging/log4j/log4j2/trunk/src/changes/changes.xml

Modified: logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/LoggerContext.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/LoggerContext.java?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/LoggerContext.java (original)
+++ logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/LoggerContext.java Sun Aug 19 07:37:50 2012
@@ -27,6 +27,8 @@ import java.io.File;
 import java.net.URI;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * The LoggerContext is the anchor for the logging system. It maintains a list of all the loggers requested by
@@ -51,7 +53,17 @@ public class LoggerContext implements or
 
     private final URI configLocation;
 
-    private boolean isStarted;
+    public enum Status {
+        INITIALIZED,
+        STARTING,
+        STARTED,
+        STOPPING,
+        STOPPED
+    }
+
+    private volatile Status status = Status.INITIALIZED;
+
+    private Lock configLock = new ReentrantLock();
 
     /**
      * Constructor taking only a name.
@@ -106,19 +118,38 @@ public class LoggerContext implements or
     }
 
     public void start() {
-        reconfigure();
-        isStarted = true;
+        if (configLock.tryLock()) {
+            try {
+                if (status == Status.INITIALIZED) {
+                    status = Status.STARTING;
+                    reconfigure();
+                    status = Status.STARTED;
+                }
+            } finally {
+                configLock.unlock();
+            }
+        }
+    }
+
+    public void stop() {
+        configLock.lock();
+        try {
+            status = Status.STOPPING;
+            updateLoggers(new NullConfiguration());
+            config.stop();
+            externalContext = null;
+            status = Status.STOPPED;
+        } finally {
+            configLock.unlock();
+        }
     }
 
-    public synchronized void stop() {
-        isStarted = false;
-        updateLoggers(new NullConfiguration());
-        config.stop();
-        externalContext = null;
+    public Status getStatus() {
+        return status;
     }
 
     public boolean isStarted() {
-        return isStarted;
+        return status == Status.STARTED;
     }
 
     /**

Modified: logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/config/plugins/PluginManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/config/plugins/PluginManager.java?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/config/plugins/PluginManager.java (original)
+++ logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/config/plugins/PluginManager.java Sun Aug 19 07:37:50 2012
@@ -209,7 +209,10 @@ public class PluginManager {
                 for (int j = 0; j < count; ++j) {
                     String type = dis.readUTF();
                     int entries = dis.readInt();
-                    ConcurrentMap<String, PluginType> types = new ConcurrentHashMap<String, PluginType>(count);
+                    ConcurrentMap<String, PluginType> types = map.get(type);
+                    if (types == null) {
+                        types = new ConcurrentHashMap<String, PluginType>(count);
+                    }
                     for (int i = 0; i < entries; ++i) {
                         String key = dis.readUTF();
                         String className = dis.readUTF();

Modified: logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/impl/Log4jContextFactory.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/impl/Log4jContextFactory.java?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/impl/Log4jContextFactory.java (original)
+++ logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/impl/Log4jContextFactory.java Sun Aug 19 07:37:50 2012
@@ -33,8 +33,6 @@ public class Log4jContextFactory impleme
 
     private StatusLogger logger = StatusLogger.getLogger();
 
-    private ThreadLocal<Log4jContextFactory> recursive = new ThreadLocal<Log4jContextFactory>();
-
     /**
      * Constructor that initializes the ContextSelector.
      */
@@ -72,17 +70,9 @@ public class Log4jContextFactory impleme
      */
     public LoggerContext getContext(String fqcn, boolean currentContext) {
         LoggerContext ctx = selector.getContext(fqcn, currentContext);
-        synchronized (ctx) {
-            if (recursive.get() != null || ctx.isStarted()) {
-                return ctx;
-            }
-            try {
-                recursive.set(this);
-                ctx.start();
-                return ctx;
-            } finally {
-                recursive.remove();
-            }
+        if (ctx.getStatus() == LoggerContext.Status.INITIALIZED) {
+            ctx.start();
         }
+        return ctx;
     }
 }

Modified: logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/layout/RFC5424Layout.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/layout/RFC5424Layout.java?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/layout/RFC5424Layout.java (original)
+++ logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/layout/RFC5424Layout.java Sun Aug 19 07:37:50 2012
@@ -30,10 +30,13 @@ import org.apache.logging.log4j.message.
 import org.apache.logging.log4j.message.StructuredDataMessage;
 
 import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Enumeration;
 import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.Map;
@@ -228,6 +231,25 @@ public final class RFC5424Layout extends
             InetAddress addr = InetAddress.getLocalHost();
             return addr.getHostName();
         } catch (UnknownHostException uhe) {
+            try {
+                Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+                while (interfaces.hasMoreElements()) {
+                    NetworkInterface nic = interfaces.nextElement();
+                    Enumeration<InetAddress> addresses = nic.getInetAddresses();
+                    while (addresses.hasMoreElements()) {
+                        InetAddress address = addresses.nextElement();
+                        if (!address.isLoopbackAddress()) {
+                            String hostname = address.getHostName();
+                            if (hostname != null) {
+                                return hostname;
+                            }
+                        }
+                    }
+                }
+            } catch (SocketException se) {
+                LOGGER.error("Could not determine local host name", uhe);
+                return "UNKNOWN_LOCALHOST";
+            }
             LOGGER.error("Could not determine local host name", uhe);
             return "UNKNOWN_LOCALHOST";
         }

Modified: logging/log4j/log4j2/trunk/flume-ng/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/pom.xml?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/pom.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/pom.xml Sun Aug 19 07:37:50 2012
@@ -32,6 +32,7 @@
     <log4jParentDir>${basedir}/..</log4jParentDir>
     <docLabel>Flume Documentation</docLabel>
     <projectDir>/flume-ng</projectDir>
+    <flumeVersion>1.2.0</flumeVersion>
   </properties>
   <dependencies>
     <dependency>
@@ -47,6 +48,10 @@
       <artifactId>slf4j-impl</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j12-api</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>4.7</version>
@@ -55,14 +60,45 @@
     <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
-      <version>1.2.0</version>
+      <version>${flumeVersion}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-node</artifactId>
+      <version>${flumeVersion}</version>
       <exclusions>
         <exclusion>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-log4j12</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flume.flume-ng-channels</groupId>
+      <artifactId>flume-file-channel</artifactId>
+      <version>${flumeVersion}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-core</artifactId>
+      <version>1.0.1</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>

Copied: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (from r1371545, logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java)
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java?p2=logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java&p1=logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java&r1=1371545&r2=1374701&rev=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java Sun Aug 19 07:37:50 2012
@@ -20,25 +20,20 @@ import org.apache.logging.log4j.core.Fil
 import org.apache.logging.log4j.core.Layout;
 import org.apache.logging.log4j.core.LogEvent;
 import org.apache.logging.log4j.core.appender.AppenderBase;
+import org.apache.logging.log4j.core.config.Property;
 import org.apache.logging.log4j.core.config.plugins.Plugin;
 import org.apache.logging.log4j.core.config.plugins.PluginAttr;
 import org.apache.logging.log4j.core.config.plugins.PluginElement;
 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
 import org.apache.logging.log4j.core.layout.RFC5424Layout;
 
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.Enumeration;
-
 /**
  * An Appender that uses the Avro protocol to route events to Flume.
  */
 @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
-public final class FlumeAvroAppender extends AppenderBase implements FlumeEventFactory {
+public final class FlumeAppender extends AppenderBase implements FlumeEventFactory {
 
-    private FlumeAvroManager manager;
+    private FlumeManager manager;
 
     private final String mdcIncludes;
     private final String mdcExcludes;
@@ -50,18 +45,16 @@ public final class FlumeAvroAppender ext
 
     private final boolean compressBody;
 
-    private final String hostname;
-
     private final int reconnectDelay;
 
     private final int retries;
 
     private final FlumeEventFactory factory;
 
-    private FlumeAvroAppender(String name, Filter filter, Layout layout, boolean handleException,
-                              String hostname, String includes, String excludes, String required, String mdcPrefix,
-                              String eventPrefix, boolean compress, int delay, int retries,
-                              FlumeEventFactory factory, FlumeAvroManager manager) {
+    private FlumeAppender(String name, Filter filter, Layout layout, boolean handleException,
+                          String includes, String excludes, String required, String mdcPrefix,
+                          String eventPrefix, boolean compress, int delay, int retries,
+                          FlumeEventFactory factory, FlumeManager manager) {
         super(name, filter, layout, handleException);
         this.manager = manager;
         this.mdcIncludes = includes;
@@ -70,7 +63,6 @@ public final class FlumeAvroAppender ext
         this.eventPrefix = eventPrefix;
         this.mdcPrefix = mdcPrefix;
         this.compressBody = compress;
-        this.hostname = hostname;
         this.reconnectDelay = delay;
         this.retries = retries;
         this.factory = factory == null ? this : factory;
@@ -131,7 +123,9 @@ public final class FlumeAvroAppender ext
      * @return A Flume Avro Appender.
      */
     @PluginFactory
-    public static FlumeAvroAppender createAppender(@PluginElement("agents") Agent[] agents,
+    public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
+                                                   @PluginElement("properties") Property[] properties,
+                                                   @PluginAttr("embedded") String embedded,
                                                    @PluginAttr("reconnectionDelay") String delay,
                                                    @PluginAttr("agentRetries") String agentRetries,
                                                    @PluginAttr("name") String name,
@@ -147,18 +141,8 @@ public final class FlumeAvroAppender ext
                                                    @PluginElement("layout") Layout layout,
                                                    @PluginElement("filters") Filter filter) {
 
-        String hostname;
-        try {
-            hostname = getHostName();
-        } catch (Exception ex) {
-            LOGGER.error("Unable to determine local hostname", ex);
-            return null;
-        }
-        if (agents == null || agents.length == 0) {
-            LOGGER.debug("No agents provided, using defaults");
-            agents = new Agent[] {Agent.createAgent(null, null)};
-        }
-
+        boolean embed = embedded != null ? Boolean.valueOf(embedded) :
+            (agents == null || agents.length == 0) && properties != null && properties.length > 0;
         boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
         boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
 
@@ -176,37 +160,23 @@ public final class FlumeAvroAppender ext
             return null;
         }
 
-        FlumeAvroManager manager = FlumeAvroManager.getManager(agents, batchCount);
-        if (manager == null) {
-            return null;
-        }
+        FlumeManager manager;
 
-        return new FlumeAvroAppender(name, filter, layout,  handleExceptions, hostname, includes,
-            excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
-    }
-
-    private static String getHostName() throws Exception {
-        try {
-            return InetAddress.getLocalHost().getHostName();
-        } catch (Exception ex) {
-            // Could not locate host the easy way.
+        if (embed) {
+            manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount);
+        } else {
+            if (agents == null || agents.length == 0) {
+                LOGGER.debug("No agents provided, using defaults");
+                agents = new Agent[] {Agent.createAgent(null, null)};
+            }
+            manager = FlumeAvroManager.getManager(name, agents, batchCount);
         }
 
-        Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
-        while (interfaces.hasMoreElements()) {
-            NetworkInterface nic = interfaces.nextElement();
-            Enumeration<InetAddress> addresses = nic.getInetAddresses();
-            while (addresses.hasMoreElements()) {
-                InetAddress address = addresses.nextElement();
-                if (!address.isLoopbackAddress()) {
-                    String hostname = address.getHostName();
-                    if (hostname != null) {
-                        return hostname;
-                    }
-                }
-            }
+        if (manager == null) {
+            return null;
         }
-        throw new UnknownHostException("Unable to determine host name");
 
+        return new FlumeAppender(name, filter, layout,  handleExceptions, includes,
+            excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
     }
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java Sun Aug 19 07:37:50 2012
@@ -38,7 +38,7 @@ import java.util.Map;
 /**
  * Manager for FlumeAvroAppenders.
  */
-public class FlumeAvroManager extends AbstractManager {
+public class FlumeAvroManager extends FlumeManager {
 
     /**
       The default reconnection delay (500 milliseconds or .5 seconds).
@@ -67,7 +67,7 @@ public class FlumeAvroManager extends Ab
      * @param agents An array of Agents.
      * @param batchSize The number of evetns to include in a batch.
      */
-    protected FlumeAvroManager(String name, Agent[] agents, int batchSize) {
+    protected FlumeAvroManager(String name, String shortName, Agent[] agents, int batchSize) {
         super(name);
         this.agents = agents;
         this.batchSize = batchSize;
@@ -80,7 +80,7 @@ public class FlumeAvroManager extends Ab
      * @param batchSize The number of events to include in a batch.
      * @return A FlumeAvroManager.
      */
-    public static FlumeAvroManager getManager(Agent[] agents, int batchSize) {
+    public static FlumeAvroManager getManager(String name, Agent[] agents, int batchSize) {
         if (agents == null || agents.length == 0) {
             throw new IllegalArgumentException("At least one agent is required");
         }
@@ -99,7 +99,7 @@ public class FlumeAvroManager extends Ab
             first = false;
         }
         sb.append("]");
-        return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(agents, batchSize));
+        return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize));
     }
 
     /**
@@ -118,7 +118,7 @@ public class FlumeAvroManager extends Ab
         return current;
     }
 
-    protected synchronized void send(FlumeEvent event, int delay, int retries)  {
+    public synchronized void send(FlumeEvent event, int delay, int retries)  {
         if (delay == 0) {
             delay = DEFAULT_RECONNECTION_DELAY;
         }
@@ -276,15 +276,18 @@ public class FlumeAvroManager extends Ab
      * Factory data.
      */
     private static class FactoryData {
+        private String name;
         private Agent[] agents;
         private int batchSize;
 
         /**
          * Constructor.
+         * @param name The name of the Appender.
          * @param agents The agents.
          * @param batchSize The number of events to include in a batch.
          */
-        public FactoryData(Agent[] agents, int batchSize) {
+        public FactoryData(String name, Agent[] agents, int batchSize) {
+            this.name = name;
             this.agents = agents;
             this.batchSize = batchSize;
         }
@@ -304,7 +307,7 @@ public class FlumeAvroManager extends Ab
         public FlumeAvroManager createManager(String name, FactoryData data) {
             try {
 
-                return new FlumeAvroManager(name, data.agents, data.batchSize);
+                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize);
             } catch (Exception ex) {
                 LOGGER.error("Could not create FlumeAvroManager", ex);
             }

Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelFactory;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.Source;
+import org.apache.flume.SourceFactory;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.ChannelSelectorFactory;
+import org.apache.flume.channel.DefaultChannelFactory;
+import org.apache.flume.conf.BasicConfigurationConstants;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
+import org.apache.flume.conf.file.SimpleNodeConfiguration;
+import org.apache.flume.conf.sink.SinkConfiguration;
+import org.apache.flume.conf.sink.SinkGroupConfiguration;
+import org.apache.flume.conf.source.SourceConfiguration;
+import org.apache.flume.node.NodeConfiguration;
+import org.apache.flume.node.nodemanager.NodeConfigurationAware;
+import org.apache.flume.sink.DefaultSinkFactory;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.sink.SinkGroup;
+import org.apache.flume.source.DefaultSourceFactory;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.ConfigurationException;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * See Flume's PropertiesFileConfigurationProvider. This class would extend that if it were possible.
+ */
+
+public class FlumeConfigurationBuilder {
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final ChannelFactory channelFactory = new DefaultChannelFactory();
+    private final SourceFactory sourceFactory = new DefaultSourceFactory();
+    private final SinkFactory sinkFactory = new DefaultSinkFactory();
+
+    public NodeConfiguration load(String name, Properties props, NodeConfigurationAware configurationAware) {
+        NodeConfiguration conf = new SimpleNodeConfiguration();
+        FlumeConfiguration fconfig = new FlumeConfiguration(props);
+        List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
+        if (errors.size() > 0) {
+            boolean isError = false;
+            for (FlumeConfigurationError error : errors) {
+                StringBuilder sb = new StringBuilder();
+                sb.append(error.getComponentName()).append(" ").append(error.getKey()).append(" ");
+                sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
+                switch (error.getErrorOrWarning()) {
+                    case ERROR:
+                        isError = true;
+                        LOGGER.error(sb.toString());
+                        break;
+                    case WARNING:
+                        LOGGER.warn(sb.toString());
+                        break;
+                }
+            }
+            if (isError) {
+                for (String key : props.stringPropertyNames()) {
+                    LOGGER.error(key + "=" + props.getProperty(key));
+                }
+                throw new ConfigurationException("Unable to configure Flume due to errors");
+            }
+        }
+        FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name);
+
+        if (agentConf != null) {
+
+            loadChannels(agentConf, conf);
+            loadSources(agentConf, conf);
+            loadSinks(agentConf, conf);
+
+            configurationAware.startAllComponents(conf);
+        } else {
+            LOGGER.warn("No configuration found for: {}", name);
+        }
+        return conf;
+    }
+
+    protected void loadChannels(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
+        LOGGER.info("Creating channels");
+        Set<String> channels = agentConf.getChannelSet();
+        Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
+        for (String chName : channels) {
+            ComponentConfiguration comp = compMap.get(chName);
+            if (comp != null) {
+                Channel channel = channelFactory.create(comp.getComponentName(), comp.getType());
+
+                Configurables.configure(channel, comp);
+
+                conf.getChannels().put(comp.getComponentName(), channel);
+            }
+        }
+
+        for (String ch : channels) {
+            Context context = agentConf.getChannelContext().get(ch);
+            if (context != null) {
+                Channel channel = channelFactory.create(ch, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+                Configurables.configure(channel, context);
+                conf.getChannels().put(ch, channel);
+                LOGGER.info("created channel " + ch);
+            }
+        }
+    }
+
+    protected void loadSources(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
+
+        Set<String> sources = agentConf.getSourceSet();
+        Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap();
+        for (String sourceName : sources) {
+            ComponentConfiguration comp = compMap.get(sourceName);
+            if (comp != null) {
+                SourceConfiguration config = (SourceConfiguration) comp;
+
+                Source source = sourceFactory.create(comp.getComponentName(), comp.getType());
+
+                Configurables.configure(source, config);
+                Set<String> channelNames = config.getChannels();
+                List<Channel> channels = new ArrayList<Channel>();
+                for (String chName : channelNames) {
+                    channels.add(conf.getChannels().get(chName));
+                }
+
+                ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
+
+                ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
+
+                ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+                Configurables.configure(channelProcessor, config);
+
+                source.setChannelProcessor(channelProcessor);
+                conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source));
+            }
+        }
+        Map<String, Context> sourceContexts = agentConf.getSourceContext();
+
+        for (String src : sources) {
+            Context context = sourceContexts.get(src);
+            if (context != null){
+                Source source = sourceFactory.create(src, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+                List<Channel> channels = new ArrayList<Channel>();
+                Configurables.configure(source, context);
+                String[] channelNames = context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
+                for (String chName : channelNames) {
+                    channels.add(conf.getChannels().get(chName));
+                }
+
+                Map<String, String> selectorConfig = context.getSubProperties(
+                    BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
+
+                ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
+
+                ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+                Configurables.configure(channelProcessor, context);
+
+                source.setChannelProcessor(channelProcessor);
+                conf.getSourceRunners().put(src, SourceRunner.forSource(source));
+            }
+        }
+    }
+
+    protected void loadSinks(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
+        Set<String> sinkNames = agentConf.getSinkSet();
+        Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap();
+        Map<String, Sink> sinks = new HashMap<String, Sink>();
+        for (String sinkName : sinkNames) {
+            ComponentConfiguration comp = compMap.get(sinkName);
+            if (comp != null) {
+                SinkConfiguration config = (SinkConfiguration) comp;
+                Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
+
+                Configurables.configure(sink, config);
+
+                sink.setChannel(conf.getChannels().get(config.getChannel()));
+                sinks.put(comp.getComponentName(), sink);
+            }
+        }
+
+        Map<String, Context> sinkContexts = agentConf.getSinkContext();
+        for (String sinkName : sinkNames) {
+            Context context = sinkContexts.get(sinkName);
+            if (context != null) {
+                Sink sink = sinkFactory.create(sinkName, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+                Configurables.configure(sink, context);
+
+                sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)));
+                sinks.put(sinkName, sink);
+            }
+        }
+
+        loadSinkGroups(agentConf, sinks, conf);
+    }
+
+    protected void loadSinkGroups(FlumeConfiguration.AgentConfiguration agentConf,
+                                  Map<String, Sink> sinks, NodeConfiguration conf) {
+        Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
+        Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
+        Map<String, String> usedSinks = new HashMap<String, String>();
+        for (String groupName : sinkgroupNames) {
+            ComponentConfiguration comp = compMap.get(groupName);
+            if (comp != null) {
+                SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
+                List<String> groupSinkList = groupConf.getSinks();
+                List<Sink> groupSinks = new ArrayList<Sink>();
+                for (String sink : groupSinkList) {
+                    Sink s = sinks.remove(sink);
+                    if (s == null) {
+                        String sinkUser = usedSinks.get(sink);
+                        if (sinkUser != null) {
+                            throw new ConfigurationException(String.format(
+                                "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
+                        } else {
+                            throw new ConfigurationException(String.format(
+                                "Sink %s of group %s does not exist or is not properly configured", sink,
+                                groupName));
+                        }
+                    }
+                    groupSinks.add(s);
+                    usedSinks.put(sink, groupName);
+                }
+                SinkGroup group = new SinkGroup(groupSinks);
+                Configurables.configure(group, groupConf);
+                conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
+            }
+        }
+        // add any unasigned sinks to solo collectors
+        for (Map.Entry<String, Sink> entry : sinks.entrySet()) {
+            if (!usedSinks.containsValue(entry.getKey())) {
+                SinkProcessor pr = new DefaultSinkProcessor();
+                List<Sink> sinkMap = new ArrayList<Sink>();
+                sinkMap.add(entry.getValue());
+                pr.setSinks(sinkMap);
+                Configurables.configure(pr, new Context());
+                conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
+            }
+        }
+    }
+}

Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.SourceRunner;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.node.NodeConfiguration;
+import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+import org.apache.logging.log4j.core.config.ConfigurationException;
+import org.apache.logging.log4j.core.config.Property;
+
+import java.security.MessageDigest;
+import java.util.Properties;
+
+/**
+ *
+ */
+public class FlumeEmbeddedManager extends FlumeManager {
+
+    private static ManagerFactory factory = new FlumeManagerFactory();
+
+    private final FlumeNode node;
+
+    private NodeConfiguration conf;
+
+    protected static final String SOURCE_NAME = "log4j-source";
+
+    private final Log4jEventSource source;
+
+    private final String shortName;
+
+
+    /**
+     * Constructor
+     * @param name The unique name of this manager.
+     * @param node The Flume Node.
+     */
+    protected FlumeEmbeddedManager(String name, String shortName, FlumeNode node) {
+        super(name);
+        this.node = node;
+        this.shortName = shortName;
+        SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
+        if (runner == null || runner.getSource() == null) {
+            throw new IllegalStateException("No Source has been created for Appender " + shortName);
+        }
+        source  = (Log4jEventSource) runner.getSource();
+    }
+
+    /**
+     * Return a FlumeEmbeddedManager.
+     * @param agents The agents to use.
+     * @param batchSize The number of events to include in a batch.
+     * @return A FlumeAvroManager.
+     */
+    public static FlumeEmbeddedManager getManager(String name, Agent[] agents, Property[] properties, int batchSize) {
+
+        if (batchSize <= 0) {
+            batchSize = 1;
+        }
+
+        if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
+            throw new IllegalArgumentException("Either an Agent or properties are required");
+        } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
+            throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
+        }
+
+        StringBuilder sb = new StringBuilder();
+        boolean first = true;
+
+        if (agents != null && agents.length > 0) {
+            sb.append("FlumeEmbedded[");
+            for (Agent agent : agents) {
+                if (!first) {
+                    sb.append(",");
+                }
+                sb.append(agent.getHost()).append(":").append(agent.getPort());
+                first = false;
+            }
+            sb.append("]");
+        } else {
+            String sep = "";
+            sb.append(name).append(":");
+            StringBuilder props = new StringBuilder();
+            for (Property prop : properties) {
+                props.append(sep);
+                props.append(prop.getName()).append("=").append(prop.getValue());
+                sep = ",";
+            }
+            try {
+                MessageDigest digest = MessageDigest.getInstance("MD5");
+                digest.update(sb.toString().getBytes());
+                byte[] bytes = digest.digest();
+                StringBuilder md5 = new StringBuilder();
+                for (byte b : bytes) {
+                    String hex = Integer.toHexString(0xff & b);
+                    if (hex.length() == 1) {
+                        md5.append('0');
+                    }
+                    md5.append(hex);
+                }
+                sb.append(md5.toString());
+            } catch (Exception ex) {
+                sb.append(props);
+            }
+        }
+        return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
+            new FactoryData(name, agents, properties, batchSize));
+    }
+
+    public void send(FlumeEvent event, int delay, int retries) {
+        source.send(event);
+    }
+
+    @Override
+    protected void releaseSub() {
+        node.stop();
+    }
+
+    /**
+     * Factory data.
+     */
+    private static class FactoryData {
+        private Agent[] agents;
+        private Property[] properties;
+        private int batchSize;
+        private String name;
+
+        /**
+         * Constructor.
+         * @param name The name of the Appender.
+         * @param agents The agents.
+         * @param properties The Flume configuration properties.
+         * @param batchSize The number of events to include in a batch.
+         */
+        public FactoryData(String name, Agent[] agents, Property[] properties, int batchSize) {
+            this.name = name;
+            this.agents = agents;
+            this.batchSize = batchSize;
+            this.properties = properties;
+        }
+    }
+
+    /**
+     * Avro Manager Factory.
+     */
+    private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
+        private static final String sourceType = Log4jEventSource.class.getName();
+
+        /**
+         * Create the FlumeAvroManager.
+         * @param name The name of the entity to manage.
+         * @param data The data required to create the entity.
+         * @return The FlumeAvroManager.
+         */
+        public FlumeEmbeddedManager createManager(String name, FactoryData data) {
+            try {
+                DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
+                Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize);
+                FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
+                NodeConfiguration conf = builder.load(data.name, props, nodeManager);
+
+                FlumeNode node = new FlumeNode(nodeManager, conf);
+
+                node.start();
+                LifecycleController.waitForOneOf(node, LifecycleState.START_OR_ERROR);
+
+                return new FlumeEmbeddedManager(name, data.name, node);
+            } catch (Exception ex) {
+                LOGGER.error("Could not create FlumeEmbeddedManager", ex);
+            }
+            return null;
+        }
+
+        private Properties createProperties(String name, Agent[] agents, Property[] properties, int batchSize) {
+            Properties props = new Properties();
+
+            if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
+                LOGGER.error("No Flume configuration provided");
+                throw new ConfigurationException("No Flume configuration provided");
+            }
+
+            if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
+                LOGGER.error("Agents and Flume configuration cannot both be specified");
+                throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
+            }
+
+            if (agents != null && agents.length > 0) {
+                props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
+                props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
+                props.put(name + ".channels", "file");
+                props.put(name + ".channels.file.type", "file");
+
+                StringBuilder sb = new StringBuilder();
+                String leading = "";
+                int priority = agents.length;
+                for (int i=0; i < agents.length; ++i) {
+                    sb.append(leading).append("agent").append(i);
+                    leading = " ";
+                    String prefix = name + "sinks.agent" + i;
+                    props.put(prefix + ".channel", "file");
+                    props.put(prefix + ".type", "avro");
+                    props.put(prefix + ".hostname", agents[i].getHost());
+                    props.put(prefix + ".port", agents[i].getPort());
+                    props.put(prefix + ".batch-size", batchSize);
+                    props.put(name + ".sinkgroups.group1.sinks", "agent" +i);
+                    props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
+                    --priority;
+                }
+                props.put(name + ".sinks", sb.toString());
+                props.put(name + ".sinkgroups", "group1");
+                props.put(name + ".sinkgroups.group1.processor.type", "failover");
+                String sourceChannels = "file";
+                props.put(name + ".channels", sourceChannels);
+                props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
+            } else {
+                String channels = null;
+                String[] sinks = null;
+
+                props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
+                props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
+
+                for (Property property : properties) {
+                    String key = property.getName();
+
+                    if (key == null || key.length() == 0) {
+                        String msg = "A property name must be provided";
+                        LOGGER.error(msg);
+                        throw new ConfigurationException(msg);
+                    }
+
+                    String upperKey = key.toUpperCase();
+
+                    if (upperKey.startsWith(name.toUpperCase())) {
+                        String msg = "Specification of the agent name is allowed in Flume Appender configuration: " + key;
+                        LOGGER.error(msg);
+                        throw new ConfigurationException(msg);
+                    }
+
+                    if (upperKey.startsWith("SOURCES.")) {
+                        String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
+                        LOGGER.error(msg);
+                        throw new ConfigurationException(msg);
+                    }
+
+                    String value = property.getValue();
+                    if (value == null || value.length() == 0) {
+                        String msg = "A value for property " + key + " must be provided";
+                        LOGGER.error(msg);
+                        throw new ConfigurationException(msg);
+                    }
+
+                    if (upperKey.equals("CHANNELS")) {
+                        channels = value.trim();
+                    } else if (upperKey.equals("SINKS")) {
+                        sinks = value.trim().split(" ");
+                    }
+
+                    props.put(name + "." + key, value);
+                }
+
+                String sourceChannels = channels;
+
+                if (channels == null) {
+                    sourceChannels = "file";
+                    props.put(name + ".channels", sourceChannels);
+                }
+
+                props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
+
+                if (sinks == null || sinks.length == 0) {
+                    String msg = "At least one Sink must be specified";
+                    LOGGER.error(msg);
+                    throw new ConfigurationException(msg);
+                }
+            }
+            return props;
+        }
+
+    }
+
+}

Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeManager.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeManager.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeManager.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.logging.log4j.core.appender.AbstractManager;
+
+/**
+ *
+ */
+public abstract class FlumeManager extends AbstractManager {
+
+    public FlumeManager(String name) {
+        super(name);
+    }
+
+    public abstract void send(FlumeEvent event, int delay, int retries);
+}

Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeNode.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeNode.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeNode.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeNode.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.node.NodeConfiguration;
+import org.apache.flume.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class FlumeNode implements LifecycleAware {
+
+    private static final Logger logger = LoggerFactory.getLogger(FlumeNode.class);
+
+    private LifecycleState lifecycleState;
+    private final NodeManager nodeManager;
+    private final LifecycleSupervisor supervisor;
+    private final NodeConfiguration conf;
+
+    public FlumeNode(NodeManager manager, NodeConfiguration conf) {
+        this.nodeManager = manager;
+        this.conf =conf;
+        supervisor = new LifecycleSupervisor();
+    }
+
+    public void start() {
+
+        Preconditions.checkState(nodeManager != null,
+            "Node manager can not be null");
+
+        supervisor.start();
+
+        logger.info("Flume node starting");
+
+        supervisor.supervise(nodeManager,
+            new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+
+        lifecycleState = LifecycleState.START;
+    }
+
+    public void stop() {
+
+        logger.info("Flume node stopping");
+
+        supervisor.stop();
+
+        lifecycleState = LifecycleState.STOP;
+    }
+
+    public NodeManager getNodeManager() {
+        return nodeManager;
+    }
+
+    public NodeConfiguration getConfiguration() {
+        return conf;
+    }
+
+    public LifecycleState getLifecycleState() {
+        return lifecycleState;
+    }
+
+}

Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.source.AbstractSource;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ *
+ */
+public class Log4jEventSource extends AbstractSource implements EventDrivenSource {
+
+    private SourceCounter sourceCounter = new SourceCounter("log4j");
+
+    private static final Logger logger = LoggerFactory.getLogger(Log4jEventSource.class);
+
+    public Log4jEventSource() {
+        setName("Log4jEvent");
+    }
+
+    @Override
+    public synchronized void start() {
+        super.start();
+
+        logger.info("Log4j Source started");
+    }
+
+    @Override
+    public synchronized void stop() {
+        super.stop();
+
+        logger.info("Log4j Source stopped. Metrics {}", sourceCounter);
+    }
+
+
+    public void send(FlumeEvent event) {
+        sourceCounter.incrementAppendReceivedCount();
+        sourceCounter.incrementEventReceivedCount();
+        try {
+            getChannelProcessor().processEvent(event);
+        } catch (ChannelException ex) {
+            logger.warn("Unabled to process event {}" + event, ex);
+            throw ex;
+        }
+    }
+}

Copied: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (from r1371545, logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java)
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java?p2=logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java&p1=logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java&r1=1371545&r2=1374701&rev=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java Sun Aug 19 07:37:50 2012
@@ -35,7 +35,6 @@ import org.apache.logging.log4j.LogManag
 import org.apache.logging.log4j.core.Appender;
 import org.apache.logging.log4j.core.Logger;
 import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.config.plugins.PluginManager;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -55,7 +54,7 @@ import java.util.zip.GZIPInputStream;
 /**
  *
  */
-public class FlumeAvroAppenderTest {
+public class FlumeAppenderTest {
 
     private static LoggerContext ctx;
 
@@ -124,8 +123,8 @@ public class FlumeAvroAppenderTest {
     @Test
     public void testLog4jAvroAppender() throws InterruptedException, IOException {
         Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
-            null, null, null, null, "true", "1", null, null, null);
+        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
+            null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -151,8 +150,8 @@ public class FlumeAvroAppenderTest {
     @Test
     public void testMultiple() throws InterruptedException, IOException {
         Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
-            null, null, null, null, "true", "1", null, null, null);
+        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
+            null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -181,8 +180,8 @@ public class FlumeAvroAppenderTest {
      @Test
     public void testBatch() throws InterruptedException, IOException {
         Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
-            null, null, null, null, "true", "10", null, null, null);
+        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
+            null, null, null, null, null, "true", "10", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -212,8 +211,8 @@ public class FlumeAvroAppenderTest {
     @Test
     public void testConnectionRefused() {
         Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
-            null, null, null, null, "true", "1", null, null, null);
+        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
+            null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -239,8 +238,8 @@ public class FlumeAvroAppenderTest {
         String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
         Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
                                       Agent.createAgent("localhost", altPort)};
-        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
-            null, null, null, null, "true", "1", null, null, null);
+        FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
+            null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);

Added: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.source.AvroSource;
+import org.apache.logging.log4j.EventLogger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.XMLConfigurationFactory;
+import org.apache.logging.log4j.message.StructuredDataMessage;
+import org.apache.logging.log4j.status.StatusLogger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+/**
+ *
+ */
+public class FlumeEmbeddedAgentTest {
+    private static final String CONFIG = "default_embedded.xml";
+    private static LoggerContext ctx;
+
+    private static final int testServerPort = 12345;
+
+    private AvroSource primarySource;
+    private AvroSource altSource;
+    private Channel channel;
+
+    private String testPort;
+    private String altPort;
+
+    @BeforeClass
+    public static void setupClass() {
+        // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
+    }
+
+    @AfterClass
+    public static void cleanupClass() {
+        StatusLogger.getLogger().reset();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        primarySource = new AvroSource();
+        primarySource.setName("Primary");
+        altSource = new AvroSource();
+        altSource.setName("Alternate");
+        channel = new MemoryChannel();
+
+        Configurables.configure(channel, new Context());
+
+        /*
+        * Clear out all other appenders associated with this logger to ensure we're
+        * only hitting the Avro appender.
+        */
+        Context context = new Context();
+        testPort = String.valueOf(testServerPort);
+        context.put("port", testPort);
+        context.put("bind", "localhost");
+        Configurables.configure(primarySource, context);
+
+        context = new Context();
+        altPort = String.valueOf(testServerPort + 1);
+        context.put("port", altPort);
+        context.put("bind", "localhost");
+        Configurables.configure(altSource, context);
+
+        List<Channel> channels = new ArrayList<Channel>();
+        channels.add(channel);
+
+        ChannelSelector cs = new ReplicatingChannelSelector();
+        cs.setChannels(channels);
+
+        primarySource.setChannelProcessor(new ChannelProcessor(cs));
+        altSource.setChannelProcessor(new ChannelProcessor(cs));
+
+        primarySource.start();
+        altSource.start();
+
+    	  Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+            primarySource, LifecycleState.START_OR_ERROR));
+        Assert.assertEquals("Server is started", LifecycleState.START, primarySource.getLifecycleState());
+        System.setProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG);
+        ctx = (LoggerContext) LogManager.getContext(false);
+        ctx.reconfigure();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        System.clearProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY);
+        ctx.reconfigure();
+        primarySource.stop();
+        altSource.stop();
+	      Assert.assertTrue("Reached stop or error",
+	           LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
+	      Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+            primarySource.getLifecycleState());
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
+        for (ObjectName name : names) {
+            try {
+                server.unregisterMBean(name);
+            } catch (Exception ex) {
+                System.out.println("Unable to unregister " + name.toString());
+            }
+        }
+    }
+
+    @Test
+    public void testLog4Event() throws InterruptedException, IOException {
+
+        StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message", "Test");
+        EventLogger.logEvent(msg);
+
+        Transaction transaction = channel.getTransaction();
+        transaction.begin();
+
+        Event event = channel.take();
+   	    Assert.assertNotNull(event);
+        String body = getBody(event);
+  	    Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+            body.endsWith("Test Message"));
+	      transaction.commit();
+	      transaction.close();
+
+	      primarySource.stop();
+    }
+
+    @Test
+    public void testMultiple() throws InterruptedException, IOException {
+
+        for (int i = 0; i < 10; ++i) {
+            StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+            EventLogger.logEvent(msg);
+        }
+        for (int i = 0; i < 10; ++i) {
+            Transaction transaction = channel.getTransaction();
+            transaction.begin();
+
+            Event event = channel.take();
+            Assert.assertNotNull(event);
+            String body = getBody(event);
+            Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+                body.endsWith("Test Message " + i));
+            transaction.commit();
+            transaction.close();
+        }
+
+        primarySource.stop();
+    }
+
+    @Test
+    public void testFailover() throws InterruptedException, IOException {
+        Logger logger = LogManager.getLogger("testFailover");
+        logger.debug("Starting testFailover");
+        for (int i = 0; i < 10; ++i) {
+            StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+            EventLogger.logEvent(msg);
+        }
+        for (int i = 0; i < 10; ++i) {
+            Transaction transaction = channel.getTransaction();
+            transaction.begin();
+
+            Event event = channel.take();
+            Assert.assertNotNull(event);
+            String body = getBody(event);
+            Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+                body.endsWith("Test Message " + i));
+            transaction.commit();
+            transaction.close();
+        }
+
+        primarySource.stop();
+
+
+        for (int i = 0; i < 10; ++i) {
+            StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+            EventLogger.logEvent(msg);
+        }
+        for (int i = 0; i < 10; ++i) {
+            Transaction transaction = channel.getTransaction();
+            transaction.begin();
+
+            Event event = channel.take();
+            Assert.assertNotNull(event);
+            String body = getBody(event);
+            Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+                body.endsWith("Test Message " + i));
+            transaction.commit();
+            transaction.close();
+        }
+    }
+
+
+    private String getBody(Event event) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+            int n = 0;
+            while (-1 != (n = is.read())) {
+                baos.write(n);
+            }
+            return new String(baos.toByteArray());
+
+    }
+}

Added: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.source.AvroSource;
+import org.apache.logging.log4j.EventLogger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.XMLConfigurationFactory;
+import org.apache.logging.log4j.message.StructuredDataMessage;
+import org.apache.logging.log4j.status.StatusLogger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+/**
+ *
+ */
+public class FlumeEmbeddedAppenderTest {
+    private static final String CONFIG = "embedded.xml";
+    private static LoggerContext ctx;
+
+    private static final int testServerPort = 12345;
+
+    private AvroSource primarySource;
+    private AvroSource altSource;
+    private Channel channel;
+
+    private String testPort;
+    private String altPort;
+    private int counter;
+
+    @BeforeClass
+    public static void setupClass() {
+        // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
+    }
+
+    @AfterClass
+    public static void cleanupClass() {
+        StatusLogger.getLogger().reset();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        primarySource = new AvroSource();
+        primarySource.setName("Primary");
+        altSource = new AvroSource();
+        altSource.setName("Alternate");
+        channel = new MemoryChannel();
+        channel.setName("Memory");
+        ++counter;
+
+        Configurables.configure(channel, new Context());
+
+        /*
+        * Clear out all other appenders associated with this logger to ensure we're
+        * only hitting the Avro appender.
+        */
+        Context context = new Context();
+        testPort = String.valueOf(testServerPort);
+        context.put("port", testPort);
+        context.put("bind", "localhost");
+        Configurables.configure(primarySource, context);
+
+        context = new Context();
+        altPort = String.valueOf(testServerPort + 1);
+        context.put("port", altPort);
+        context.put("bind", "localhost");
+        Configurables.configure(altSource, context);
+
+        List<Channel> channels = new ArrayList<Channel>();
+        channels.add(channel);
+
+        ChannelSelector cs = new ReplicatingChannelSelector();
+        cs.setChannels(channels);
+
+        primarySource.setChannelProcessor(new ChannelProcessor(cs));
+        altSource.setChannelProcessor(new ChannelProcessor(cs));
+
+        primarySource.start();
+        altSource.start();
+
+    	  Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+            primarySource, LifecycleState.START_OR_ERROR));
+        Assert.assertEquals("Server is started", LifecycleState.START, primarySource.getLifecycleState());
+        System.setProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG);
+        ctx = (LoggerContext) LogManager.getContext(false);
+        ctx.reconfigure();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        System.clearProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY);
+        ctx.reconfigure();
+        primarySource.stop();
+        altSource.stop();
+	      Assert.assertTrue("Reached stop or error",
+	           LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
+	      Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+            primarySource.getLifecycleState());
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
+        for (ObjectName name : names) {
+            try {
+                server.unregisterMBean(name);
+            } catch (Exception ex) {
+                System.out.println("Unable to unregister " + name.toString());
+            }
+        }
+    }
+
+    @Test
+    public void testLog4Event() throws InterruptedException, IOException {
+
+        StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message", "Test");
+        EventLogger.logEvent(msg);
+
+        Transaction transaction = channel.getTransaction();
+        transaction.begin();
+
+        Event event = channel.take();
+   	    Assert.assertNotNull(event);
+        String body = getBody(event);
+  	    Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+            body.endsWith("Test Message"));
+	      transaction.commit();
+	      transaction.close();
+
+	      primarySource.stop();
+    }
+
+    @Test
+    public void testMultiple() throws InterruptedException, IOException {
+
+        for (int i = 0; i < 10; ++i) {
+            StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+            EventLogger.logEvent(msg);
+        }
+        for (int i = 0; i < 10; ++i) {
+            Transaction transaction = channel.getTransaction();
+            transaction.begin();
+
+            Event event = channel.take();
+            Assert.assertNotNull(event);
+            String body = getBody(event);
+            Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+                body.endsWith("Test Message " + i));
+            transaction.commit();
+            transaction.close();
+        }
+
+        primarySource.stop();
+    }
+
+
+    @Test
+    public void testFailover() throws InterruptedException, IOException {
+        Logger logger = LogManager.getLogger("testFailover");
+        logger.debug("Starting testFailover");
+        for (int i = 0; i < 10; ++i) {
+            StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+            EventLogger.logEvent(msg);
+        }
+        for (int i = 0; i < 10; ++i) {
+            Transaction transaction = channel.getTransaction();
+            transaction.begin();
+
+            Event event = channel.take();
+            Assert.assertNotNull(event);
+            String body = getBody(event);
+            Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+                body.endsWith("Test Message " + i));
+            transaction.commit();
+            transaction.close();
+        }
+
+        primarySource.stop();
+
+
+        for (int i = 0; i < 10; ++i) {
+            StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+            EventLogger.logEvent(msg);
+        }
+        for (int i = 0; i < 10; ++i) {
+            Transaction transaction = channel.getTransaction();
+            transaction.begin();
+
+            Event event = channel.take();
+            Assert.assertNotNull(event);
+            String body = getBody(event);
+            Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+                body.endsWith("Test Message " + i));
+            transaction.commit();
+            transaction.close();
+        }
+    }
+
+
+    private String getBody(Event event) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+            int n = 0;
+            while (-1 != (n = is.read())) {
+                baos.write(n);
+            }
+            return new String(baos.toByteArray());
+
+    }
+}

Added: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml Sun Aug 19 07:37:50 2012
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration status="warn" name="MyApp" packages="">
+  <appenders>
+    <Flume name="eventLogger" suppressExceptions="false" compress="true">
+      <Agent host="localhost" port="12345"/>
+      <Agent host="localhost" port="12346"/>
+      <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
+    </Flume>
+    <Console name="STDOUT">
+      <PatternLayout pattern="%d [%p] %c %m%n"/>
+    </Console>
+  </appenders>
+  <loggers>
+    <logger name="EventLogger" level="info">
+      <appender-ref ref="eventLogger"/>
+    </logger>
+    <root level="warn">
+      <appender-ref ref="STDOUT"/>
+    </root>
+  </loggers>
+</configuration>
\ No newline at end of file

Added: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml Sun Aug 19 07:37:50 2012
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration status="error" name="MyApp" packages="">
+  <appenders>
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true">
+      <Property name="channels">file</Property>
+      <Property name="channels.file.type">file</Property>
+      <Property name="channels.file.checkpointDir">target/file-channel/checkpoint</Property>
+      <Property name="channels.file.dataDirs">target/file-channel/data</Property>
+      <Property name="sinks">agent1 agent2</Property>
+      <Property name="sinks.agent1.channel">file</Property>
+      <Property name="sinks.agent1.type">avro</Property>
+      <Property name="sinks.agent1.hostname">localhost</Property>
+      <Property name="sinks.agent1.port">12345</Property>
+      <Property name="sinks.agent1.batch-size">1</Property>
+      <Property name="sinks.agent2.channel">file</Property>
+      <Property name="sinks.agent2.type">avro</Property>
+      <Property name="sinks.agent2.hostname">localhost</Property>
+      <Property name="sinks.agent2.port">12346</Property>
+      <Property name="sinks.agent2.batch-size">1</Property>
+      <Property name="sinkgroups">group1</Property>
+      <Property name="sinkgroups.group1.sinks">agent1 agent2</Property>
+      <Property name="sinkgroups.group1.processor.type">failover</Property>
+      <Property name="sinkgroups.group1.processor.priority.agent1">10</Property>
+      <Property name="sinkgroups.group1.processor.priority.agent2">5</Property>
+      <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
+    </Flume>
+    <Console name="STDOUT">
+      <PatternLayout pattern="%d [%p] %c %m%n"/>
+    </Console>
+  </appenders>
+  <loggers>
+    <logger name="EventLogger" level="info">
+      <appender-ref ref="eventLogger"/>
+    </logger>
+    <root level="warn">
+      <appender-ref ref="STDOUT"/>
+    </root>
+  </loggers>
+</configuration>
\ No newline at end of file

Modified: logging/log4j/log4j2/trunk/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/pom.xml?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/pom.xml (original)
+++ logging/log4j/log4j2/trunk/pom.xml Sun Aug 19 07:37:50 2012
@@ -158,6 +158,11 @@
         <artifactId>log4j-jcl</artifactId>
         <version>${project.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j12-api</artifactId>
+        <version>${project.version}</version>
+      </dependency>
  	    <dependency>
         <groupId>javax.servlet</groupId>
         <artifactId>servlet-api</artifactId>

Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Sun Aug 19 07:37:50 2012
@@ -23,7 +23,10 @@
 
   <body>
     <release version="2.0-alpha2" date="TBD" description="Bug fixes and minor enhancements">
-      <action dev="rgoers" type="add">
+      <action issue="LOG4J2-69" dev="rgoers" type="add">
+        Allow Flume agents to be embedded into the Flume Appender.
+      </action>
+      <action issue="LOG4J2-68" dev="rgoers" type="add">
         Add support for formatting using String.format().
       </action>
       <action issue="LOG4J2-67" dev="rgoers" type="add">