You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2012/08/20 01:07:48 UTC

svn commit: r1374875 - in /logging/log4j/log4j2/trunk: flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/ flume-ng/src/test/resources/ src/site/xdoc/manual/

Author: rgoers
Date: Sun Aug 19 23:07:48 2012
New Revision: 1374875

URL: http://svn.apache.org/viewvc?rev=1374875&view=rev
Log:
Fix Embedded Agent properties. Document embedded Flume agents

Modified:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
    logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java?rev=1374875&r1=1374874&r2=1374875&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java Sun Aug 19 23:07:48 2012
@@ -56,6 +56,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
 
 /**
  * See Flume's PropertiesFileConfigurationProvider. This class would extend that if it were possible.
@@ -71,31 +75,36 @@ public class FlumeConfigurationBuilder {
 
     public NodeConfiguration load(String name, Properties props, NodeConfigurationAware configurationAware) {
         NodeConfiguration conf = new SimpleNodeConfiguration();
-        FlumeConfiguration fconfig = new FlumeConfiguration(props);
-        List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
-        if (errors.size() > 0) {
-            boolean isError = false;
-            for (FlumeConfigurationError error : errors) {
-                StringBuilder sb = new StringBuilder();
-                sb.append(error.getComponentName()).append(" ").append(error.getKey()).append(" ");
-                sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
-                switch (error.getErrorOrWarning()) {
-                    case ERROR:
-                        isError = true;
-                        LOGGER.error(sb.toString());
-                        break;
-                    case WARNING:
-                        LOGGER.warn(sb.toString());
-                        break;
+        FlumeConfiguration fconfig;
+        try {
+            fconfig = new FlumeConfiguration(props);
+            List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
+            if (errors.size() > 0) {
+                boolean isError = false;
+                for (FlumeConfigurationError error : errors) {
+                    StringBuilder sb = new StringBuilder();
+                    sb.append("Component: ").append(error.getComponentName()).append(" ");
+                    sb.append("Key: ").append(error.getKey()).append(" ");
+                    sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
+                    switch (error.getErrorOrWarning()) {
+                        case ERROR:
+                            isError = true;
+                            LOGGER.error(sb.toString());
+                            break;
+                        case WARNING:
+                            LOGGER.warn(sb.toString());
+                            break;
+                    }
                 }
-            }
-            if (isError) {
-                for (String key : props.stringPropertyNames()) {
-                    LOGGER.error(key + "=" + props.getProperty(key));
+                if (isError) {
+                    throw new ConfigurationException("Unable to configure Flume due to errors");
                 }
-                throw new ConfigurationException("Unable to configure Flume due to errors");
             }
+        } catch (RuntimeException ex) {
+            printProps(props);
+            throw ex;
         }
+
         FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name);
 
         if (agentConf != null) {
@@ -111,6 +120,12 @@ public class FlumeConfigurationBuilder {
         return conf;
     }
 
+    private void printProps(Properties props) {
+        for (String key : new TreeSet<String>(props.stringPropertyNames())) {
+            LOGGER.error(key + "=" + props.getProperty(key));
+        }
+    }
+
     protected void loadChannels(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
         LOGGER.info("Creating channels");
         Set<String> channels = agentConf.getChannelSet();

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java?rev=1374875&r1=1374874&r2=1374875&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java Sun Aug 19 23:07:48 2012
@@ -212,18 +212,18 @@ public class FlumeEmbeddedManager extend
                 for (int i=0; i < agents.length; ++i) {
                     sb.append(leading).append("agent").append(i);
                     leading = " ";
-                    String prefix = name + "sinks.agent" + i;
+                    String prefix = name + ".sinks.agent" + i;
                     props.put(prefix + ".channel", "file");
                     props.put(prefix + ".type", "avro");
                     props.put(prefix + ".hostname", agents[i].getHost());
-                    props.put(prefix + ".port", agents[i].getPort());
-                    props.put(prefix + ".batch-size", batchSize);
-                    props.put(name + ".sinkgroups.group1.sinks", "agent" +i);
+                    props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
+                    props.put(prefix + ".batch-size", Integer.toString(batchSize));
                     props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
                     --priority;
                 }
                 props.put(name + ".sinks", sb.toString());
                 props.put(name + ".sinkgroups", "group1");
+                props.put(name + ".sinkgroups.group1.sinks", sb.toString());
                 props.put(name + ".sinkgroups.group1.processor.type", "failover");
                 String sourceChannels = "file";
                 props.put(name + ".channels", sourceChannels);

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml?rev=1374875&r1=1374874&r2=1374875&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml Sun Aug 19 23:07:48 2012
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration status="warn" name="MyApp" packages="">
   <appenders>
-    <Flume name="eventLogger" suppressExceptions="false" compress="true">
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true">
       <Agent host="localhost" port="12345"/>
       <Agent host="localhost" port="12346"/>
       <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>

Modified: logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml?rev=1374875&r1=1374874&r2=1374875&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml (original)
+++ logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml Sun Aug 19 23:07:48 2012
@@ -275,19 +275,32 @@
   ]]></source>
           </p>
         </subsection>
-        <a name="FlumeAvroAppender"/>
-        <subsection name="FlumeAvroAppender">
+        <a name="FlumeAppender"/>
+        <subsection name="FlumeAppender">
           <p><i>This is an optional component supplied in a separate jar.</i></p>
           <p><a href="http://incubator.apache.org/projects/flume.html">Apache Flume</a> is a distributed, reliable,
             and available system for efficiently collecting, aggregating, and moving large amounts of log data
             from many different sources to a centralized data store. The FlumeAppender takes LogEvents and sends
             them to a Flume agent as serialized Avro events for consumption.</p>
-          <p>
+          <p><i>Note:</i><br/>
             There are two versions of the Flume Appender in the source control. The first is for "Flume OG", the
             original version of Flume before it became an Apache project. The second is for "Flume NG", which is
             maintained by the Apache Flume project. Only the Flume NG appender is included in Log4j 2 releases as
             Flume OG itself is considered deprecated.
           </p>
+          <p>
+            The Flume Appender supports two modes of operation.
+            <ol>
+              <li>It can act as a remote Flume client which sends Flume events via Avro to a Flume Agent configured
+              with an Avro Source.</li>
+              <li>It can act as an embedded Flume Agent where Flume events pass directly into Flume for processing.</li>
+            </ol>
+            Usage as an embedded agent will cause the messages to be directly passed to the Flume Channel and then
+            control will be immediately returned to the application. All interaction with remote agents will occur
+            asynchronously. Setting the "embedded" attribute to "true" will force the use of the embedded agent. In
+            addition, configuring agent properties in the appender configuration will also cause the embedded agent
+            to be used.
+          </p>
           <table border="1" width="100%">
             <tr>
               <th>Parameter Name</th>
@@ -299,7 +312,9 @@
               <td>Agent[]</td>
               <td>An array of Agents to which the logging events should be sent. If more than one agent is specified
                 the first Agent will be the primary and subsequent Agents will be used in the order specified as
-                secondaries should the primary Agent fail. Each Agent definition supplies the Agents host and port.</td>
+                secondaries should the primary Agent fail. Each Agent definition supplies the Agents host and port.
+                The specification of agents and properties are mutually exclusive. If both are configured an
+                error will result.</td>
             </tr>
             <tr>
               <td>agentRetries</td>
@@ -318,6 +333,13 @@
               <td>When set to true the message body will be compressed using gzip</td>
             </tr>
             <tr>
+              <td>embedded</td>
+              <td>boolean</td>
+              <td>When set to true the embedded Flume agent will be used. When Agent elements are used the events
+                will be sent to a file channel and then routed to a FailoverSinkProcessor which will use
+                each configured agent in the order they are declared.</td>
+            </tr>
+            <tr>
               <td>filter</td>
               <td>Filter</td>
               <td>A Filter to determine if the event should be handled by this Appender. More than one Filter
@@ -371,6 +393,14 @@
               <td>The name of the Appender.</td>
             </tr>
             <tr>
+              <td>properties</td>
+              <td>Property[]</td>
+              <td>One or more Property elements that are used to configure the Flume Agent. The properties must be
+                configured without the agent name (the appender name is used for this) and no sources can be
+                configured. All other Flume configuration properties are allowed. Specifying both Agent and Property
+                elements will result in an error.</td>
+            </tr>
+            <tr>
               <td>reconnectionDelay</td>
               <td>integer</td>
               <td>The number of milliseconds the application should wait before trying again to connect to the
@@ -386,7 +416,7 @@
             <caption align="top">FlumeAvroAppender Parameters</caption>
           </table>
             <p>
-              A sample FlumeAvroAppender configuration that is configured with a primary and a secondary agent,
+              A sample FlumeAppender configuration that is configured with a primary and a secondary agent,
               compresses the body, and formats the body using the RFC5424Layout:
 
             <source><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
@@ -406,6 +436,78 @@
 </configuration>
   ]]></source>
           </p>
+          <p>
+            A sample FlumeAppender configuration that is configured with a primary and a secondary agent,
+            compresses the body, formats the body using RFC5424Layout and passes the events to an embedded Flume
+            Agent.
+          </p>
+          <source><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+<configuration status="warn" name="MyApp" packages="">
+  <appenders>
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true">
+      <Agent host="192.168.10.101" port="8800"/>
+      <Agent host="192.168.10.102" port="8800"/>
+      <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
+    </Flume>
+    <Console name="STDOUT">
+      <PatternLayout pattern="%d [%p] %c %m%n"/>
+    </Console>
+  </appenders>
+  <loggers>
+    <logger name="EventLogger" level="info">
+      <appender-ref ref="eventLogger"/>
+    </logger>
+    <root level="warn">
+      <appender-ref ref="STDOUT"/>
+    </root>
+  </loggers>
+</configuration>
+   ]]></source>
+          <p>
+            A sample FlumeAppender configuration that is configured with a primary and a secondary agent using
+            Flume configuration properties, compresses the body, formats the body using RFC5424Layout and passes the
+            events to an embedded Flume Agent.
+          </p>
+          <source><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+<configuration status="error" name="MyApp" packages="">
+  <appenders>
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true">
+      <Property name="channels">file</Property>
+      <Property name="channels.file.type">file</Property>
+      <Property name="channels.file.checkpointDir">target/file-channel/checkpoint</Property>
+      <Property name="channels.file.dataDirs">target/file-channel/data</Property>
+      <Property name="sinks">agent1 agent2</Property>
+      <Property name="sinks.agent1.channel">file</Property>
+      <Property name="sinks.agent1.type">avro</Property>
+      <Property name="sinks.agent1.hostname">192.168.10.101</Property>
+      <Property name="sinks.agent1.port">8800</Property>
+      <Property name="sinks.agent1.batch-size">100</Property>
+      <Property name="sinks.agent2.channel">file</Property>
+      <Property name="sinks.agent2.type">avro</Property>
+      <Property name="sinks.agent2.hostname">192.168.10.102</Property>
+      <Property name="sinks.agent2.port">8800</Property>
+      <Property name="sinks.agent2.batch-size">100</Property>
+      <Property name="sinkgroups">group1</Property>
+      <Property name="sinkgroups.group1.sinks">agent1 agent2</Property>
+      <Property name="sinkgroups.group1.processor.type">failover</Property>
+      <Property name="sinkgroups.group1.processor.priority.agent1">10</Property>
+      <Property name="sinkgroups.group1.processor.priority.agent2">5</Property>
+      <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
+    </Flume>
+    <Console name="STDOUT">
+      <PatternLayout pattern="%d [%p] %c %m%n"/>
+    </Console>
+  </appenders>
+  <loggers>
+    <logger name="EventLogger" level="info">
+      <appender-ref ref="eventLogger"/>
+    </logger>
+    <root level="warn">
+      <appender-ref ref="STDOUT"/>
+    </root>
+  </loggers>
+</configuration>
+   ]]></source>
         </subsection>
         <a name="JMSQueueAppender"/>
         <subsection name="JMSQueueAppender">