You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2012/08/19 09:37:51 UTC
svn commit: r1374701 - in /logging/log4j/log4j2/trunk: ./
core/src/main/java/org/apache/logging/log4j/core/
core/src/main/java/org/apache/logging/log4j/core/config/plugins/
core/src/main/java/org/apache/logging/log4j/core/impl/
core/src/main/java/org/a...
Author: rgoers
Date: Sun Aug 19 07:37:50 2012
New Revision: 1374701
URL: http://svn.apache.org/viewvc?rev=1374701&view=rev
Log:
Fix FLUME-69, Allow Flume agent to be embedded into the Flume appender.
Added:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
- copied, changed from r1371545, logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeManager.java
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeNode.java
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
- copied, changed from r1371545, logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
logging/log4j/log4j2/trunk/flume-ng/src/test/resources/
logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml
Removed:
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java
Modified:
logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/LoggerContext.java
logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/config/plugins/PluginManager.java
logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/impl/Log4jContextFactory.java
logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/layout/RFC5424Layout.java
logging/log4j/log4j2/trunk/flume-ng/pom.xml
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
logging/log4j/log4j2/trunk/pom.xml
logging/log4j/log4j2/trunk/src/changes/changes.xml
Modified: logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/LoggerContext.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/LoggerContext.java?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/LoggerContext.java (original)
+++ logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/LoggerContext.java Sun Aug 19 07:37:50 2012
@@ -27,6 +27,8 @@ import java.io.File;
import java.net.URI;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* The LoggerContext is the anchor for the logging system. It maintains a list of all the loggers requested by
@@ -51,7 +53,17 @@ public class LoggerContext implements or
private final URI configLocation;
- private boolean isStarted;
+ public enum Status {
+ INITIALIZED,
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED
+ }
+
+ private volatile Status status = Status.INITIALIZED;
+
+ private Lock configLock = new ReentrantLock();
/**
* Constructor taking only a name.
@@ -106,19 +118,38 @@ public class LoggerContext implements or
}
public void start() {
- reconfigure();
- isStarted = true;
+ if (configLock.tryLock()) {
+ try {
+ if (status == Status.INITIALIZED) {
+ status = Status.STARTING;
+ reconfigure();
+ status = Status.STARTED;
+ }
+ } finally {
+ configLock.unlock();
+ }
+ }
+ }
+
+ public void stop() {
+ configLock.lock();
+ try {
+ status = Status.STOPPING;
+ updateLoggers(new NullConfiguration());
+ config.stop();
+ externalContext = null;
+ status = Status.STOPPED;
+ } finally {
+ configLock.unlock();
+ }
}
- public synchronized void stop() {
- isStarted = false;
- updateLoggers(new NullConfiguration());
- config.stop();
- externalContext = null;
+ public Status getStatus() {
+ return status;
}
public boolean isStarted() {
- return isStarted;
+ return status == Status.STARTED;
}
/**
Modified: logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/config/plugins/PluginManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/config/plugins/PluginManager.java?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/config/plugins/PluginManager.java (original)
+++ logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/config/plugins/PluginManager.java Sun Aug 19 07:37:50 2012
@@ -209,7 +209,10 @@ public class PluginManager {
for (int j = 0; j < count; ++j) {
String type = dis.readUTF();
int entries = dis.readInt();
- ConcurrentMap<String, PluginType> types = new ConcurrentHashMap<String, PluginType>(count);
+ ConcurrentMap<String, PluginType> types = map.get(type);
+ if (types == null) {
+ types = new ConcurrentHashMap<String, PluginType>(count);
+ }
for (int i = 0; i < entries; ++i) {
String key = dis.readUTF();
String className = dis.readUTF();
Modified: logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/impl/Log4jContextFactory.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/impl/Log4jContextFactory.java?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/impl/Log4jContextFactory.java (original)
+++ logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/impl/Log4jContextFactory.java Sun Aug 19 07:37:50 2012
@@ -33,8 +33,6 @@ public class Log4jContextFactory impleme
private StatusLogger logger = StatusLogger.getLogger();
- private ThreadLocal<Log4jContextFactory> recursive = new ThreadLocal<Log4jContextFactory>();
-
/**
* Constructor that initializes the ContextSelector.
*/
@@ -72,17 +70,9 @@ public class Log4jContextFactory impleme
*/
public LoggerContext getContext(String fqcn, boolean currentContext) {
LoggerContext ctx = selector.getContext(fqcn, currentContext);
- synchronized (ctx) {
- if (recursive.get() != null || ctx.isStarted()) {
- return ctx;
- }
- try {
- recursive.set(this);
- ctx.start();
- return ctx;
- } finally {
- recursive.remove();
- }
+ if (ctx.getStatus() == LoggerContext.Status.INITIALIZED) {
+ ctx.start();
}
+ return ctx;
}
}
Modified: logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/layout/RFC5424Layout.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/layout/RFC5424Layout.java?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/layout/RFC5424Layout.java (original)
+++ logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/layout/RFC5424Layout.java Sun Aug 19 07:37:50 2012
@@ -30,10 +30,13 @@ import org.apache.logging.log4j.message.
import org.apache.logging.log4j.message.StructuredDataMessage;
import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Enumeration;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
@@ -228,6 +231,25 @@ public final class RFC5424Layout extends
InetAddress addr = InetAddress.getLocalHost();
return addr.getHostName();
} catch (UnknownHostException uhe) {
+ try {
+ Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+ while (interfaces.hasMoreElements()) {
+ NetworkInterface nic = interfaces.nextElement();
+ Enumeration<InetAddress> addresses = nic.getInetAddresses();
+ while (addresses.hasMoreElements()) {
+ InetAddress address = addresses.nextElement();
+ if (!address.isLoopbackAddress()) {
+ String hostname = address.getHostName();
+ if (hostname != null) {
+ return hostname;
+ }
+ }
+ }
+ }
+ } catch (SocketException se) {
+ LOGGER.error("Could not determine local host name", uhe);
+ return "UNKNOWN_LOCALHOST";
+ }
LOGGER.error("Could not determine local host name", uhe);
return "UNKNOWN_LOCALHOST";
}
Modified: logging/log4j/log4j2/trunk/flume-ng/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/pom.xml?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/pom.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/pom.xml Sun Aug 19 07:37:50 2012
@@ -32,6 +32,7 @@
<log4jParentDir>${basedir}/..</log4jParentDir>
<docLabel>Flume Documentation</docLabel>
<projectDir>/flume-ng</projectDir>
+ <flumeVersion>1.2.0</flumeVersion>
</properties>
<dependencies>
<dependency>
@@ -47,6 +48,10 @@
<artifactId>slf4j-impl</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j12-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
@@ -55,14 +60,45 @@
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
- <version>1.2.0</version>
+ <version>${flumeVersion}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-node</artifactId>
+ <version>${flumeVersion}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-channels</groupId>
+ <artifactId>flume-file-channel</artifactId>
+ <version>${flumeVersion}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>1.0.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
Copied: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (from r1371545, logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java)
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java?p2=logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java&p1=logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java&r1=1371545&r2=1374701&rev=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java Sun Aug 19 07:37:50 2012
@@ -20,25 +20,20 @@ import org.apache.logging.log4j.core.Fil
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AppenderBase;
+import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttr;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.RFC5424Layout;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.Enumeration;
-
/**
* An Appender that uses the Avro protocol to route events to Flume.
*/
@Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
-public final class FlumeAvroAppender extends AppenderBase implements FlumeEventFactory {
+public final class FlumeAppender extends AppenderBase implements FlumeEventFactory {
- private FlumeAvroManager manager;
+ private FlumeManager manager;
private final String mdcIncludes;
private final String mdcExcludes;
@@ -50,18 +45,16 @@ public final class FlumeAvroAppender ext
private final boolean compressBody;
- private final String hostname;
-
private final int reconnectDelay;
private final int retries;
private final FlumeEventFactory factory;
- private FlumeAvroAppender(String name, Filter filter, Layout layout, boolean handleException,
- String hostname, String includes, String excludes, String required, String mdcPrefix,
- String eventPrefix, boolean compress, int delay, int retries,
- FlumeEventFactory factory, FlumeAvroManager manager) {
+ private FlumeAppender(String name, Filter filter, Layout layout, boolean handleException,
+ String includes, String excludes, String required, String mdcPrefix,
+ String eventPrefix, boolean compress, int delay, int retries,
+ FlumeEventFactory factory, FlumeManager manager) {
super(name, filter, layout, handleException);
this.manager = manager;
this.mdcIncludes = includes;
@@ -70,7 +63,6 @@ public final class FlumeAvroAppender ext
this.eventPrefix = eventPrefix;
this.mdcPrefix = mdcPrefix;
this.compressBody = compress;
- this.hostname = hostname;
this.reconnectDelay = delay;
this.retries = retries;
this.factory = factory == null ? this : factory;
@@ -131,7 +123,9 @@ public final class FlumeAvroAppender ext
* @return A Flume Avro Appender.
*/
@PluginFactory
- public static FlumeAvroAppender createAppender(@PluginElement("agents") Agent[] agents,
+ public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
+ @PluginElement("properties") Property[] properties,
+ @PluginAttr("embedded") String embedded,
@PluginAttr("reconnectionDelay") String delay,
@PluginAttr("agentRetries") String agentRetries,
@PluginAttr("name") String name,
@@ -147,18 +141,8 @@ public final class FlumeAvroAppender ext
@PluginElement("layout") Layout layout,
@PluginElement("filters") Filter filter) {
- String hostname;
- try {
- hostname = getHostName();
- } catch (Exception ex) {
- LOGGER.error("Unable to determine local hostname", ex);
- return null;
- }
- if (agents == null || agents.length == 0) {
- LOGGER.debug("No agents provided, using defaults");
- agents = new Agent[] {Agent.createAgent(null, null)};
- }
-
+ boolean embed = embedded != null ? Boolean.valueOf(embedded) :
+ (agents == null || agents.length == 0) && properties != null && properties.length > 0;
boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
@@ -176,37 +160,23 @@ public final class FlumeAvroAppender ext
return null;
}
- FlumeAvroManager manager = FlumeAvroManager.getManager(agents, batchCount);
- if (manager == null) {
- return null;
- }
+ FlumeManager manager;
- return new FlumeAvroAppender(name, filter, layout, handleExceptions, hostname, includes,
- excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
- }
-
- private static String getHostName() throws Exception {
- try {
- return InetAddress.getLocalHost().getHostName();
- } catch (Exception ex) {
- // Could not locate host the easy way.
+ if (embed) {
+ manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount);
+ } else {
+ if (agents == null || agents.length == 0) {
+ LOGGER.debug("No agents provided, using defaults");
+ agents = new Agent[] {Agent.createAgent(null, null)};
+ }
+ manager = FlumeAvroManager.getManager(name, agents, batchCount);
}
- Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
- while (interfaces.hasMoreElements()) {
- NetworkInterface nic = interfaces.nextElement();
- Enumeration<InetAddress> addresses = nic.getInetAddresses();
- while (addresses.hasMoreElements()) {
- InetAddress address = addresses.nextElement();
- if (!address.isLoopbackAddress()) {
- String hostname = address.getHostName();
- if (hostname != null) {
- return hostname;
- }
- }
- }
+ if (manager == null) {
+ return null;
}
- throw new UnknownHostException("Unable to determine host name");
+ return new FlumeAppender(name, filter, layout, handleExceptions, includes,
+ excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
}
}
Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java Sun Aug 19 07:37:50 2012
@@ -38,7 +38,7 @@ import java.util.Map;
/**
* Manager for FlumeAvroAppenders.
*/
-public class FlumeAvroManager extends AbstractManager {
+public class FlumeAvroManager extends FlumeManager {
/**
The default reconnection delay (500 milliseconds or .5 seconds).
@@ -67,7 +67,7 @@ public class FlumeAvroManager extends Ab
* @param agents An array of Agents.
* @param batchSize The number of evetns to include in a batch.
*/
- protected FlumeAvroManager(String name, Agent[] agents, int batchSize) {
+ protected FlumeAvroManager(String name, String shortName, Agent[] agents, int batchSize) {
super(name);
this.agents = agents;
this.batchSize = batchSize;
@@ -80,7 +80,7 @@ public class FlumeAvroManager extends Ab
* @param batchSize The number of events to include in a batch.
* @return A FlumeAvroManager.
*/
- public static FlumeAvroManager getManager(Agent[] agents, int batchSize) {
+ public static FlumeAvroManager getManager(String name, Agent[] agents, int batchSize) {
if (agents == null || agents.length == 0) {
throw new IllegalArgumentException("At least one agent is required");
}
@@ -99,7 +99,7 @@ public class FlumeAvroManager extends Ab
first = false;
}
sb.append("]");
- return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(agents, batchSize));
+ return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize));
}
/**
@@ -118,7 +118,7 @@ public class FlumeAvroManager extends Ab
return current;
}
- protected synchronized void send(FlumeEvent event, int delay, int retries) {
+ public synchronized void send(FlumeEvent event, int delay, int retries) {
if (delay == 0) {
delay = DEFAULT_RECONNECTION_DELAY;
}
@@ -276,15 +276,18 @@ public class FlumeAvroManager extends Ab
* Factory data.
*/
private static class FactoryData {
+ private String name;
private Agent[] agents;
private int batchSize;
/**
* Constructor.
+ * @param name The name of the Appender.
* @param agents The agents.
* @param batchSize The number of events to include in a batch.
*/
- public FactoryData(Agent[] agents, int batchSize) {
+ public FactoryData(String name, Agent[] agents, int batchSize) {
+ this.name = name;
this.agents = agents;
this.batchSize = batchSize;
}
@@ -304,7 +307,7 @@ public class FlumeAvroManager extends Ab
public FlumeAvroManager createManager(String name, FactoryData data) {
try {
- return new FlumeAvroManager(name, data.agents, data.batchSize);
+ return new FlumeAvroManager(name, data.name, data.agents, data.batchSize);
} catch (Exception ex) {
LOGGER.error("Could not create FlumeAvroManager", ex);
}
Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeConfigurationBuilder.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelFactory;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.Source;
+import org.apache.flume.SourceFactory;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.ChannelSelectorFactory;
+import org.apache.flume.channel.DefaultChannelFactory;
+import org.apache.flume.conf.BasicConfigurationConstants;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
+import org.apache.flume.conf.file.SimpleNodeConfiguration;
+import org.apache.flume.conf.sink.SinkConfiguration;
+import org.apache.flume.conf.sink.SinkGroupConfiguration;
+import org.apache.flume.conf.source.SourceConfiguration;
+import org.apache.flume.node.NodeConfiguration;
+import org.apache.flume.node.nodemanager.NodeConfigurationAware;
+import org.apache.flume.sink.DefaultSinkFactory;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.sink.SinkGroup;
+import org.apache.flume.source.DefaultSourceFactory;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.ConfigurationException;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * See Flume's PropertiesFileConfigurationProvider. This class would extend that if it were possible.
+ */
+
+public class FlumeConfigurationBuilder {
+
+ private static final Logger LOGGER = StatusLogger.getLogger();
+
+ private final ChannelFactory channelFactory = new DefaultChannelFactory();
+ private final SourceFactory sourceFactory = new DefaultSourceFactory();
+ private final SinkFactory sinkFactory = new DefaultSinkFactory();
+
+ public NodeConfiguration load(String name, Properties props, NodeConfigurationAware configurationAware) {
+ NodeConfiguration conf = new SimpleNodeConfiguration();
+ FlumeConfiguration fconfig = new FlumeConfiguration(props);
+ List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
+ if (errors.size() > 0) {
+ boolean isError = false;
+ for (FlumeConfigurationError error : errors) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(error.getComponentName()).append(" ").append(error.getKey()).append(" ");
+ sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
+ switch (error.getErrorOrWarning()) {
+ case ERROR:
+ isError = true;
+ LOGGER.error(sb.toString());
+ break;
+ case WARNING:
+ LOGGER.warn(sb.toString());
+ break;
+ }
+ }
+ if (isError) {
+ for (String key : props.stringPropertyNames()) {
+ LOGGER.error(key + "=" + props.getProperty(key));
+ }
+ throw new ConfigurationException("Unable to configure Flume due to errors");
+ }
+ }
+ FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name);
+
+ if (agentConf != null) {
+
+ loadChannels(agentConf, conf);
+ loadSources(agentConf, conf);
+ loadSinks(agentConf, conf);
+
+ configurationAware.startAllComponents(conf);
+ } else {
+ LOGGER.warn("No configuration found for: {}", name);
+ }
+ return conf;
+ }
+
+ protected void loadChannels(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
+ LOGGER.info("Creating channels");
+ Set<String> channels = agentConf.getChannelSet();
+ Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
+ for (String chName : channels) {
+ ComponentConfiguration comp = compMap.get(chName);
+ if (comp != null) {
+ Channel channel = channelFactory.create(comp.getComponentName(), comp.getType());
+
+ Configurables.configure(channel, comp);
+
+ conf.getChannels().put(comp.getComponentName(), channel);
+ }
+ }
+
+ for (String ch : channels) {
+ Context context = agentConf.getChannelContext().get(ch);
+ if (context != null) {
+ Channel channel = channelFactory.create(ch, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+ Configurables.configure(channel, context);
+ conf.getChannels().put(ch, channel);
+ LOGGER.info("created channel " + ch);
+ }
+ }
+ }
+
+ protected void loadSources(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
+
+ Set<String> sources = agentConf.getSourceSet();
+ Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap();
+ for (String sourceName : sources) {
+ ComponentConfiguration comp = compMap.get(sourceName);
+ if (comp != null) {
+ SourceConfiguration config = (SourceConfiguration) comp;
+
+ Source source = sourceFactory.create(comp.getComponentName(), comp.getType());
+
+ Configurables.configure(source, config);
+ Set<String> channelNames = config.getChannels();
+ List<Channel> channels = new ArrayList<Channel>();
+ for (String chName : channelNames) {
+ channels.add(conf.getChannels().get(chName));
+ }
+
+ ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
+
+ ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
+
+ ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+ Configurables.configure(channelProcessor, config);
+
+ source.setChannelProcessor(channelProcessor);
+ conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source));
+ }
+ }
+ Map<String, Context> sourceContexts = agentConf.getSourceContext();
+
+ for (String src : sources) {
+ Context context = sourceContexts.get(src);
+ if (context != null){
+ Source source = sourceFactory.create(src, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+ List<Channel> channels = new ArrayList<Channel>();
+ Configurables.configure(source, context);
+ String[] channelNames = context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
+ for (String chName : channelNames) {
+ channels.add(conf.getChannels().get(chName));
+ }
+
+ Map<String, String> selectorConfig = context.getSubProperties(
+ BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
+
+ ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
+
+ ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+ Configurables.configure(channelProcessor, context);
+
+ source.setChannelProcessor(channelProcessor);
+ conf.getSourceRunners().put(src, SourceRunner.forSource(source));
+ }
+ }
+ }
+
+ protected void loadSinks(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
+ Set<String> sinkNames = agentConf.getSinkSet();
+ Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap();
+ Map<String, Sink> sinks = new HashMap<String, Sink>();
+ for (String sinkName : sinkNames) {
+ ComponentConfiguration comp = compMap.get(sinkName);
+ if (comp != null) {
+ SinkConfiguration config = (SinkConfiguration) comp;
+ Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
+
+ Configurables.configure(sink, config);
+
+ sink.setChannel(conf.getChannels().get(config.getChannel()));
+ sinks.put(comp.getComponentName(), sink);
+ }
+ }
+
+ Map<String, Context> sinkContexts = agentConf.getSinkContext();
+ for (String sinkName : sinkNames) {
+ Context context = sinkContexts.get(sinkName);
+ if (context != null) {
+ Sink sink = sinkFactory.create(sinkName, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+ Configurables.configure(sink, context);
+
+ sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)));
+ sinks.put(sinkName, sink);
+ }
+ }
+
+ loadSinkGroups(agentConf, sinks, conf);
+ }
+
+ protected void loadSinkGroups(FlumeConfiguration.AgentConfiguration agentConf,
+ Map<String, Sink> sinks, NodeConfiguration conf) {
+ Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
+ Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
+ Map<String, String> usedSinks = new HashMap<String, String>();
+ for (String groupName : sinkgroupNames) {
+ ComponentConfiguration comp = compMap.get(groupName);
+ if (comp != null) {
+ SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
+ List<String> groupSinkList = groupConf.getSinks();
+ List<Sink> groupSinks = new ArrayList<Sink>();
+ for (String sink : groupSinkList) {
+ Sink s = sinks.remove(sink);
+ if (s == null) {
+ String sinkUser = usedSinks.get(sink);
+ if (sinkUser != null) {
+ throw new ConfigurationException(String.format(
+ "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
+ } else {
+ throw new ConfigurationException(String.format(
+ "Sink %s of group %s does not exist or is not properly configured", sink,
+ groupName));
+ }
+ }
+ groupSinks.add(s);
+ usedSinks.put(sink, groupName);
+ }
+ SinkGroup group = new SinkGroup(groupSinks);
+ Configurables.configure(group, groupConf);
+ conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
+ }
+ }
+ // add any unasigned sinks to solo collectors
+ for (Map.Entry<String, Sink> entry : sinks.entrySet()) {
+ if (!usedSinks.containsValue(entry.getKey())) {
+ SinkProcessor pr = new DefaultSinkProcessor();
+ List<Sink> sinkMap = new ArrayList<Sink>();
+ sinkMap.add(entry.getValue());
+ pr.setSinks(sinkMap);
+ Configurables.configure(pr, new Context());
+ conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
+ }
+ }
+ }
+}
Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.SourceRunner;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.node.NodeConfiguration;
+import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+import org.apache.logging.log4j.core.config.ConfigurationException;
+import org.apache.logging.log4j.core.config.Property;
+
+import java.security.MessageDigest;
+import java.util.Properties;
+
+/**
+ *
+ */
+public class FlumeEmbeddedManager extends FlumeManager {
+
+ private static ManagerFactory factory = new FlumeManagerFactory();
+
+ private final FlumeNode node;
+
+ private NodeConfiguration conf;
+
+ protected static final String SOURCE_NAME = "log4j-source";
+
+ private final Log4jEventSource source;
+
+ private final String shortName;
+
+
+ /**
+ * Constructor
+ * @param name The unique name of this manager.
+ * @param node The Flume Node.
+ */
+ protected FlumeEmbeddedManager(String name, String shortName, FlumeNode node) {
+ super(name);
+ this.node = node;
+ this.shortName = shortName;
+ SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
+ if (runner == null || runner.getSource() == null) {
+ throw new IllegalStateException("No Source has been created for Appender " + shortName);
+ }
+ source = (Log4jEventSource) runner.getSource();
+ }
+
+ /**
+ * Return a FlumeEmbeddedManager.
+ * @param agents The agents to use.
+ * @param batchSize The number of events to include in a batch.
+ * @return A FlumeAvroManager.
+ */
+ public static FlumeEmbeddedManager getManager(String name, Agent[] agents, Property[] properties, int batchSize) {
+
+ if (batchSize <= 0) {
+ batchSize = 1;
+ }
+
+ if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
+ throw new IllegalArgumentException("Either an Agent or properties are required");
+ } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
+ throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
+ }
+
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+
+ if (agents != null && agents.length > 0) {
+ sb.append("FlumeEmbedded[");
+ for (Agent agent : agents) {
+ if (!first) {
+ sb.append(",");
+ }
+ sb.append(agent.getHost()).append(":").append(agent.getPort());
+ first = false;
+ }
+ sb.append("]");
+ } else {
+ String sep = "";
+ sb.append(name).append(":");
+ StringBuilder props = new StringBuilder();
+ for (Property prop : properties) {
+ props.append(sep);
+ props.append(prop.getName()).append("=").append(prop.getValue());
+ sep = ",";
+ }
+ try {
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ digest.update(sb.toString().getBytes());
+ byte[] bytes = digest.digest();
+ StringBuilder md5 = new StringBuilder();
+ for (byte b : bytes) {
+ String hex = Integer.toHexString(0xff & b);
+ if (hex.length() == 1) {
+ md5.append('0');
+ }
+ md5.append(hex);
+ }
+ sb.append(md5.toString());
+ } catch (Exception ex) {
+ sb.append(props);
+ }
+ }
+ return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
+ new FactoryData(name, agents, properties, batchSize));
+ }
+
+ public void send(FlumeEvent event, int delay, int retries) {
+ source.send(event);
+ }
+
+ @Override
+ protected void releaseSub() {
+ node.stop();
+ }
+
+ /**
+ * Factory data.
+ */
+ private static class FactoryData {
+ private Agent[] agents;
+ private Property[] properties;
+ private int batchSize;
+ private String name;
+
+ /**
+ * Constructor.
+ * @param name The name of the Appender.
+ * @param agents The agents.
+ * @param properties The Flume configuration properties.
+ * @param batchSize The number of events to include in a batch.
+ */
+ public FactoryData(String name, Agent[] agents, Property[] properties, int batchSize) {
+ this.name = name;
+ this.agents = agents;
+ this.batchSize = batchSize;
+ this.properties = properties;
+ }
+ }
+
+ /**
+ * Avro Manager Factory.
+ */
+ private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
+ private static final String sourceType = Log4jEventSource.class.getName();
+
+ /**
+ * Create the FlumeAvroManager.
+ * @param name The name of the entity to manage.
+ * @param data The data required to create the entity.
+ * @return The FlumeAvroManager.
+ */
+ public FlumeEmbeddedManager createManager(String name, FactoryData data) {
+ try {
+ DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
+ Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize);
+ FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
+ NodeConfiguration conf = builder.load(data.name, props, nodeManager);
+
+ FlumeNode node = new FlumeNode(nodeManager, conf);
+
+ node.start();
+ LifecycleController.waitForOneOf(node, LifecycleState.START_OR_ERROR);
+
+ return new FlumeEmbeddedManager(name, data.name, node);
+ } catch (Exception ex) {
+ LOGGER.error("Could not create FlumeEmbeddedManager", ex);
+ }
+ return null;
+ }
+
+ private Properties createProperties(String name, Agent[] agents, Property[] properties, int batchSize) {
+ Properties props = new Properties();
+
+ if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
+ LOGGER.error("No Flume configuration provided");
+ throw new ConfigurationException("No Flume configuration provided");
+ }
+
+ if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
+ LOGGER.error("Agents and Flume configuration cannot both be specified");
+ throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
+ }
+
+ if (agents != null && agents.length > 0) {
+ props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
+ props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
+ props.put(name + ".channels", "file");
+ props.put(name + ".channels.file.type", "file");
+
+ StringBuilder sb = new StringBuilder();
+ String leading = "";
+ int priority = agents.length;
+ for (int i=0; i < agents.length; ++i) {
+ sb.append(leading).append("agent").append(i);
+ leading = " ";
+ String prefix = name + "sinks.agent" + i;
+ props.put(prefix + ".channel", "file");
+ props.put(prefix + ".type", "avro");
+ props.put(prefix + ".hostname", agents[i].getHost());
+ props.put(prefix + ".port", agents[i].getPort());
+ props.put(prefix + ".batch-size", batchSize);
+ props.put(name + ".sinkgroups.group1.sinks", "agent" +i);
+ props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
+ --priority;
+ }
+ props.put(name + ".sinks", sb.toString());
+ props.put(name + ".sinkgroups", "group1");
+ props.put(name + ".sinkgroups.group1.processor.type", "failover");
+ String sourceChannels = "file";
+ props.put(name + ".channels", sourceChannels);
+ props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
+ } else {
+ String channels = null;
+ String[] sinks = null;
+
+ props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
+ props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
+
+ for (Property property : properties) {
+ String key = property.getName();
+
+ if (key == null || key.length() == 0) {
+ String msg = "A property name must be provided";
+ LOGGER.error(msg);
+ throw new ConfigurationException(msg);
+ }
+
+ String upperKey = key.toUpperCase();
+
+ if (upperKey.startsWith(name.toUpperCase())) {
+ String msg = "Specification of the agent name is allowed in Flume Appender configuration: " + key;
+ LOGGER.error(msg);
+ throw new ConfigurationException(msg);
+ }
+
+ if (upperKey.startsWith("SOURCES.")) {
+ String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
+ LOGGER.error(msg);
+ throw new ConfigurationException(msg);
+ }
+
+ String value = property.getValue();
+ if (value == null || value.length() == 0) {
+ String msg = "A value for property " + key + " must be provided";
+ LOGGER.error(msg);
+ throw new ConfigurationException(msg);
+ }
+
+ if (upperKey.equals("CHANNELS")) {
+ channels = value.trim();
+ } else if (upperKey.equals("SINKS")) {
+ sinks = value.trim().split(" ");
+ }
+
+ props.put(name + "." + key, value);
+ }
+
+ String sourceChannels = channels;
+
+ if (channels == null) {
+ sourceChannels = "file";
+ props.put(name + ".channels", sourceChannels);
+ }
+
+ props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
+
+ if (sinks == null || sinks.length == 0) {
+ String msg = "At least one Sink must be specified";
+ LOGGER.error(msg);
+ throw new ConfigurationException(msg);
+ }
+ }
+ return props;
+ }
+
+ }
+
+}
Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeManager.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeManager.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeManager.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.logging.log4j.core.appender.AbstractManager;
+
+/**
+ *
+ */
+public abstract class FlumeManager extends AbstractManager {
+
+ public FlumeManager(String name) {
+ super(name);
+ }
+
+ public abstract void send(FlumeEvent event, int delay, int retries);
+}
Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeNode.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeNode.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeNode.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeNode.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.node.NodeConfiguration;
+import org.apache.flume.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class FlumeNode implements LifecycleAware {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlumeNode.class);
+
+ private LifecycleState lifecycleState;
+ private final NodeManager nodeManager;
+ private final LifecycleSupervisor supervisor;
+ private final NodeConfiguration conf;
+
+ public FlumeNode(NodeManager manager, NodeConfiguration conf) {
+ this.nodeManager = manager;
+ this.conf =conf;
+ supervisor = new LifecycleSupervisor();
+ }
+
+ public void start() {
+
+ Preconditions.checkState(nodeManager != null,
+ "Node manager can not be null");
+
+ supervisor.start();
+
+ logger.info("Flume node starting");
+
+ supervisor.supervise(nodeManager,
+ new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+
+ lifecycleState = LifecycleState.START;
+ }
+
+ public void stop() {
+
+ logger.info("Flume node stopping");
+
+ supervisor.stop();
+
+ lifecycleState = LifecycleState.STOP;
+ }
+
+ public NodeManager getNodeManager() {
+ return nodeManager;
+ }
+
+ public NodeConfiguration getConfiguration() {
+ return conf;
+ }
+
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+}
Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.source.AbstractSource;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ *
+ */
+public class Log4jEventSource extends AbstractSource implements EventDrivenSource {
+
+ private SourceCounter sourceCounter = new SourceCounter("log4j");
+
+ private static final Logger logger = LoggerFactory.getLogger(Log4jEventSource.class);
+
+ public Log4jEventSource() {
+ setName("Log4jEvent");
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+
+ logger.info("Log4j Source started");
+ }
+
+ @Override
+ public synchronized void stop() {
+ super.stop();
+
+ logger.info("Log4j Source stopped. Metrics {}", sourceCounter);
+ }
+
+
+ public void send(FlumeEvent event) {
+ sourceCounter.incrementAppendReceivedCount();
+ sourceCounter.incrementEventReceivedCount();
+ try {
+ getChannelProcessor().processEvent(event);
+ } catch (ChannelException ex) {
+ logger.warn("Unabled to process event {}" + event, ex);
+ throw ex;
+ }
+ }
+}
Copied: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (from r1371545, logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java)
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java?p2=logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java&p1=logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java&r1=1371545&r2=1374701&rev=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java Sun Aug 19 07:37:50 2012
@@ -35,7 +35,6 @@ import org.apache.logging.log4j.LogManag
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Logger;
import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.config.plugins.PluginManager;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -55,7 +54,7 @@ import java.util.zip.GZIPInputStream;
/**
*
*/
-public class FlumeAvroAppenderTest {
+public class FlumeAppenderTest {
private static LoggerContext ctx;
@@ -124,8 +123,8 @@ public class FlumeAvroAppenderTest {
@Test
public void testLog4jAvroAppender() throws InterruptedException, IOException {
Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
- FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
- null, null, null, null, "true", "1", null, null, null);
+ FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
+ null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -151,8 +150,8 @@ public class FlumeAvroAppenderTest {
@Test
public void testMultiple() throws InterruptedException, IOException {
Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
- FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
- null, null, null, null, "true", "1", null, null, null);
+ FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
+ null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -181,8 +180,8 @@ public class FlumeAvroAppenderTest {
@Test
public void testBatch() throws InterruptedException, IOException {
Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
- FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
- null, null, null, null, "true", "10", null, null, null);
+ FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
+ null, null, null, null, null, "true", "10", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -212,8 +211,8 @@ public class FlumeAvroAppenderTest {
@Test
public void testConnectionRefused() {
Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
- FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
- null, null, null, null, "true", "1", null, null, null);
+ FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
+ null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
@@ -239,8 +238,8 @@ public class FlumeAvroAppenderTest {
String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
Agent.createAgent("localhost", altPort)};
- FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
- null, null, null, null, "true", "1", null, null, null);
+ FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "100", "3", "avro", "false",
+ null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
Added: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.source.AvroSource;
+import org.apache.logging.log4j.EventLogger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.XMLConfigurationFactory;
+import org.apache.logging.log4j.message.StructuredDataMessage;
+import org.apache.logging.log4j.status.StatusLogger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+/**
+ *
+ */
+public class FlumeEmbeddedAgentTest {
+ private static final String CONFIG = "default_embedded.xml";
+ private static LoggerContext ctx;
+
+ private static final int testServerPort = 12345;
+
+ private AvroSource primarySource;
+ private AvroSource altSource;
+ private Channel channel;
+
+ private String testPort;
+ private String altPort;
+
+ @BeforeClass
+ public static void setupClass() {
+ // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
+ }
+
+ @AfterClass
+ public static void cleanupClass() {
+ StatusLogger.getLogger().reset();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ primarySource = new AvroSource();
+ primarySource.setName("Primary");
+ altSource = new AvroSource();
+ altSource.setName("Alternate");
+ channel = new MemoryChannel();
+
+ Configurables.configure(channel, new Context());
+
+ /*
+ * Clear out all other appenders associated with this logger to ensure we're
+ * only hitting the Avro appender.
+ */
+ Context context = new Context();
+ testPort = String.valueOf(testServerPort);
+ context.put("port", testPort);
+ context.put("bind", "localhost");
+ Configurables.configure(primarySource, context);
+
+ context = new Context();
+ altPort = String.valueOf(testServerPort + 1);
+ context.put("port", altPort);
+ context.put("bind", "localhost");
+ Configurables.configure(altSource, context);
+
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+
+ ChannelSelector cs = new ReplicatingChannelSelector();
+ cs.setChannels(channels);
+
+ primarySource.setChannelProcessor(new ChannelProcessor(cs));
+ altSource.setChannelProcessor(new ChannelProcessor(cs));
+
+ primarySource.start();
+ altSource.start();
+
+ Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+ primarySource, LifecycleState.START_OR_ERROR));
+ Assert.assertEquals("Server is started", LifecycleState.START, primarySource.getLifecycleState());
+ System.setProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG);
+ ctx = (LoggerContext) LogManager.getContext(false);
+ ctx.reconfigure();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ System.clearProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY);
+ ctx.reconfigure();
+ primarySource.stop();
+ altSource.stop();
+ Assert.assertTrue("Reached stop or error",
+ LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
+ Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+ primarySource.getLifecycleState());
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
+ for (ObjectName name : names) {
+ try {
+ server.unregisterMBean(name);
+ } catch (Exception ex) {
+ System.out.println("Unable to unregister " + name.toString());
+ }
+ }
+ }
+
+ @Test
+ public void testLog4Event() throws InterruptedException, IOException {
+
+ StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message", "Test");
+ EventLogger.logEvent(msg);
+
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ String body = getBody(event);
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ body.endsWith("Test Message"));
+ transaction.commit();
+ transaction.close();
+
+ primarySource.stop();
+ }
+
+ @Test
+ public void testMultiple() throws InterruptedException, IOException {
+
+ for (int i = 0; i < 10; ++i) {
+ StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+ EventLogger.logEvent(msg);
+ }
+ for (int i = 0; i < 10; ++i) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ String body = getBody(event);
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ body.endsWith("Test Message " + i));
+ transaction.commit();
+ transaction.close();
+ }
+
+ primarySource.stop();
+ }
+
+ @Test
+ public void testFailover() throws InterruptedException, IOException {
+ Logger logger = LogManager.getLogger("testFailover");
+ logger.debug("Starting testFailover");
+ for (int i = 0; i < 10; ++i) {
+ StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+ EventLogger.logEvent(msg);
+ }
+ for (int i = 0; i < 10; ++i) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ String body = getBody(event);
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ body.endsWith("Test Message " + i));
+ transaction.commit();
+ transaction.close();
+ }
+
+ primarySource.stop();
+
+
+ for (int i = 0; i < 10; ++i) {
+ StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+ EventLogger.logEvent(msg);
+ }
+ for (int i = 0; i < 10; ++i) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ String body = getBody(event);
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ body.endsWith("Test Message " + i));
+ transaction.commit();
+ transaction.close();
+ }
+ }
+
+
+ private String getBody(Event event) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+ int n = 0;
+ while (-1 != (n = is.read())) {
+ baos.write(n);
+ }
+ return new String(baos.toByteArray());
+
+ }
+}
Added: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java Sun Aug 19 07:37:50 2012
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.source.AvroSource;
+import org.apache.logging.log4j.EventLogger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.XMLConfigurationFactory;
+import org.apache.logging.log4j.message.StructuredDataMessage;
+import org.apache.logging.log4j.status.StatusLogger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+/**
+ *
+ */
+public class FlumeEmbeddedAppenderTest {
+ private static final String CONFIG = "embedded.xml";
+ private static LoggerContext ctx;
+
+ private static final int testServerPort = 12345;
+
+ private AvroSource primarySource;
+ private AvroSource altSource;
+ private Channel channel;
+
+ private String testPort;
+ private String altPort;
+ private int counter;
+
+ @BeforeClass
+ public static void setupClass() {
+ // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
+ }
+
+ @AfterClass
+ public static void cleanupClass() {
+ StatusLogger.getLogger().reset();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ primarySource = new AvroSource();
+ primarySource.setName("Primary");
+ altSource = new AvroSource();
+ altSource.setName("Alternate");
+ channel = new MemoryChannel();
+ channel.setName("Memory");
+ ++counter;
+
+ Configurables.configure(channel, new Context());
+
+ /*
+ * Clear out all other appenders associated with this logger to ensure we're
+ * only hitting the Avro appender.
+ */
+ Context context = new Context();
+ testPort = String.valueOf(testServerPort);
+ context.put("port", testPort);
+ context.put("bind", "localhost");
+ Configurables.configure(primarySource, context);
+
+ context = new Context();
+ altPort = String.valueOf(testServerPort + 1);
+ context.put("port", altPort);
+ context.put("bind", "localhost");
+ Configurables.configure(altSource, context);
+
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+
+ ChannelSelector cs = new ReplicatingChannelSelector();
+ cs.setChannels(channels);
+
+ primarySource.setChannelProcessor(new ChannelProcessor(cs));
+ altSource.setChannelProcessor(new ChannelProcessor(cs));
+
+ primarySource.start();
+ altSource.start();
+
+ Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+ primarySource, LifecycleState.START_OR_ERROR));
+ Assert.assertEquals("Server is started", LifecycleState.START, primarySource.getLifecycleState());
+ System.setProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG);
+ ctx = (LoggerContext) LogManager.getContext(false);
+ ctx.reconfigure();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ System.clearProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY);
+ ctx.reconfigure();
+ primarySource.stop();
+ altSource.stop();
+ Assert.assertTrue("Reached stop or error",
+ LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
+ Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+ primarySource.getLifecycleState());
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
+ for (ObjectName name : names) {
+ try {
+ server.unregisterMBean(name);
+ } catch (Exception ex) {
+ System.out.println("Unable to unregister " + name.toString());
+ }
+ }
+ }
+
+ @Test
+ public void testLog4Event() throws InterruptedException, IOException {
+
+ StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message", "Test");
+ EventLogger.logEvent(msg);
+
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ String body = getBody(event);
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ body.endsWith("Test Message"));
+ transaction.commit();
+ transaction.close();
+
+ primarySource.stop();
+ }
+
+ @Test
+ public void testMultiple() throws InterruptedException, IOException {
+
+ for (int i = 0; i < 10; ++i) {
+ StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+ EventLogger.logEvent(msg);
+ }
+ for (int i = 0; i < 10; ++i) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ String body = getBody(event);
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ body.endsWith("Test Message " + i));
+ transaction.commit();
+ transaction.close();
+ }
+
+ primarySource.stop();
+ }
+
+
+ @Test
+ public void testFailover() throws InterruptedException, IOException {
+ Logger logger = LogManager.getLogger("testFailover");
+ logger.debug("Starting testFailover");
+ for (int i = 0; i < 10; ++i) {
+ StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+ EventLogger.logEvent(msg);
+ }
+ for (int i = 0; i < 10; ++i) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ String body = getBody(event);
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ body.endsWith("Test Message " + i));
+ transaction.commit();
+ transaction.close();
+ }
+
+ primarySource.stop();
+
+
+ for (int i = 0; i < 10; ++i) {
+ StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Message " + i, "Test");
+ EventLogger.logEvent(msg);
+ }
+ for (int i = 0; i < 10; ++i) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ String body = getBody(event);
+ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
+ body.endsWith("Test Message " + i));
+ transaction.commit();
+ transaction.close();
+ }
+ }
+
+
+ private String getBody(Event event) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+ int n = 0;
+ while (-1 != (n = is.read())) {
+ baos.write(n);
+ }
+ return new String(baos.toByteArray());
+
+ }
+}
Added: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml Sun Aug 19 07:37:50 2012
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration status="warn" name="MyApp" packages="">
+ <appenders>
+ <Flume name="eventLogger" suppressExceptions="false" compress="true">
+ <Agent host="localhost" port="12345"/>
+ <Agent host="localhost" port="12346"/>
+ <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
+ </Flume>
+ <Console name="STDOUT">
+ <PatternLayout pattern="%d [%p] %c %m%n"/>
+ </Console>
+ </appenders>
+ <loggers>
+ <logger name="EventLogger" level="info">
+ <appender-ref ref="eventLogger"/>
+ </logger>
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ </loggers>
+</configuration>
\ No newline at end of file
Added: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml?rev=1374701&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml Sun Aug 19 07:37:50 2012
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration status="error" name="MyApp" packages="">
+ <appenders>
+ <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true">
+ <Property name="channels">file</Property>
+ <Property name="channels.file.type">file</Property>
+ <Property name="channels.file.checkpointDir">target/file-channel/checkpoint</Property>
+ <Property name="channels.file.dataDirs">target/file-channel/data</Property>
+ <Property name="sinks">agent1 agent2</Property>
+ <Property name="sinks.agent1.channel">file</Property>
+ <Property name="sinks.agent1.type">avro</Property>
+ <Property name="sinks.agent1.hostname">localhost</Property>
+ <Property name="sinks.agent1.port">12345</Property>
+ <Property name="sinks.agent1.batch-size">1</Property>
+ <Property name="sinks.agent2.channel">file</Property>
+ <Property name="sinks.agent2.type">avro</Property>
+ <Property name="sinks.agent2.hostname">localhost</Property>
+ <Property name="sinks.agent2.port">12346</Property>
+ <Property name="sinks.agent2.batch-size">1</Property>
+ <Property name="sinkgroups">group1</Property>
+ <Property name="sinkgroups.group1.sinks">agent1 agent2</Property>
+ <Property name="sinkgroups.group1.processor.type">failover</Property>
+ <Property name="sinkgroups.group1.processor.priority.agent1">10</Property>
+ <Property name="sinkgroups.group1.processor.priority.agent2">5</Property>
+ <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
+ </Flume>
+ <Console name="STDOUT">
+ <PatternLayout pattern="%d [%p] %c %m%n"/>
+ </Console>
+ </appenders>
+ <loggers>
+ <logger name="EventLogger" level="info">
+ <appender-ref ref="eventLogger"/>
+ </logger>
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ </loggers>
+</configuration>
\ No newline at end of file
Modified: logging/log4j/log4j2/trunk/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/pom.xml?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/pom.xml (original)
+++ logging/log4j/log4j2/trunk/pom.xml Sun Aug 19 07:37:50 2012
@@ -158,6 +158,11 @@
<artifactId>log4j-jcl</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j12-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1374701&r1=1374700&r2=1374701&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Sun Aug 19 07:37:50 2012
@@ -23,7 +23,10 @@
<body>
<release version="2.0-alpha2" date="TBD" description="Bug fixes and minor enhancements">
- <action dev="rgoers" type="add">
+ <action issue="LOG4J2-69" dev="rgoers" type="add">
+ Allow Flume agents to be embedded into the Flume Appender.
+ </action>
+ <action issue="LOG4J2-68" dev="rgoers" type="add">
Add support for formatting using String.format().
</action>
<action issue="LOG4J2-67" dev="rgoers" type="add">