You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2012/01/02 19:52:52 UTC
svn commit: r1226514 [2/2] - in
/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers: ./ log4j2-core/
log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/
log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/
log4...
Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java?rev=1226514&view=auto
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java (added)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java Mon Jan 2 18:52:50 2012
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flumeog.appender;
+
+import com.cloudera.flume.handlers.avro.AvroFlumeEvent;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+
+import com.cloudera.flume.handlers.avro.FlumeEventAvroServer;
+import com.cloudera.flume.handlers.avro.AvroEventConvertUtil;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ * Manager for FlumeAvroAppenders.
+ */
+public class FlumeAvroManager extends AbstractManager {
+
+ /**
+ The default reconnection delay (500 milliseconds or .5 seconds).
+ */
+ public static final int DEFAULT_RECONNECTION_DELAY = 500;
+
+ private static final int DEFAULT_RECONNECTS = 3;
+
+ private static ManagerFactory factory = new AvroManagerFactory();
+
+ private FlumeEventAvroServer client;
+
+ private final Agent[] agents;
+
+ private int current = 0;
+
+ protected FlumeAvroManager(String name, Agent[] agents) {
+ super(name);
+ this.agents = agents;
+ this.client = connect(agents);
+ }
+
+ /**
+ * Return a FlumeAvroManager.
+ * @param agents The agents to use.
+ * @return A FlumeAvroManager.
+ */
+ public static FlumeAvroManager getManager(Agent[] agents) {
+ if (agents == null || agents.length == 0) {
+ throw new IllegalArgumentException("At least one agent is required");
+ }
+
+ StringBuilder sb = new StringBuilder("FlumeAvro[");
+ boolean first = true;
+ for (Agent agent : agents) {
+ if (!first) {
+ sb.append(",");
+ }
+ sb.append(agent.getHost()).append(":").append(agent.getPort());
+ first = false;
+ }
+ sb.append("]");
+ return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(agents));
+ }
+
+ /**
+ * Return the agents.
+ * @return The agent array.
+ */
+ public Agent[] getAgents() {
+ return agents;
+ }
+
+ /**
+ * Returns the index of the current agent.
+ * @return The index for the current agent.
+ */
+ public int getCurrent() {
+ return current;
+ }
+
+ protected synchronized void send(FlumeEvent event, int delay, int retries) {
+ if (delay == 0) {
+ delay = DEFAULT_RECONNECTION_DELAY;
+ }
+ if (retries == 0) {
+ retries = DEFAULT_RECONNECTS;
+ }
+ AvroFlumeEvent avroEvent = AvroEventConvertUtil.toAvroEvent(event);
+ int i = 0;
+
+ String msg = "Error writing to " + getName();
+
+ do {
+ try {
+ client.append(avroEvent);
+ return;
+ } catch (Exception ex) {
+ if (i == retries - 1) {
+ msg = "Error writing to " + getName() + " at " + agents[0].getHost() + ":" + agents[0].getPort();
+ LOGGER.warn(msg, ex);
+ break;
+ }
+ sleep(delay);
+ }
+ } while (++i < retries);
+
+ for (int index = 0; index < agents.length; ++index) {
+ if (index == current) {
+ continue;
+ }
+ Agent agent = agents[index];
+ i = 0;
+ do {
+ try {
+
+ FlumeEventAvroServer c = connect(agent.getHost(), agent.getPort());
+ c.append(avroEvent);
+ client = c;
+ current = i;
+ return;
+ } catch (Exception ex) {
+ if (i == retries - 1) {
+ String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
+ agent.getPort();
+ LOGGER.warn(warnMsg, ex);
+ break;
+ }
+ sleep(delay);
+ }
+ } while (++i < retries);
+ }
+
+ throw new AppenderRuntimeException(msg);
+
+ }
+
+ private void sleep(int delay) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * There is a very good chance that this will always return the first agent even if it isn't available.
+ * @param agents The list of agents to choose from
+ * @return The FlumeEventAvroServer.
+ */
+ private FlumeEventAvroServer connect(Agent[] agents) {
+ int i = 0;
+ for (Agent agent : agents) {
+ FlumeEventAvroServer server = connect(agent.getHost(), agent.getPort());
+ if (server != null) {
+ current = i;
+ return server;
+ }
+ ++i;
+ }
+ throw new AppenderRuntimeException("Unable to connect to any agents");
+ }
+
+ private FlumeEventAvroServer connect(String hostname, int port) {
+ URL url;
+
+ try {
+ url = new URL("http", hostname, port, "/");
+ } catch (MalformedURLException ex) {
+ LOGGER.error("Unable to create a URL for hostname " + hostname + " at port " + port, ex);
+ return null;
+ }
+
+ try {
+ return SpecificRequestor.getClient(FlumeEventAvroServer.class, new HttpTransceiver(url));
+ } catch (IOException ioe) {
+ LOGGER.error("Unable to create Avro client");
+ return null;
+ }
+ }
+
+ /**
+ * Factory data.
+ */
+ private static class FactoryData {
+ private Agent[] agents;
+
+ /**
+ * Constructor.
+ * @param agents The agents.
+ */
+ public FactoryData(Agent[] agents) {
+ this.agents = agents;
+ }
+ }
+
+ /**
+ * Avro Manager Factory.
+ */
+ private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
+
+ /**
+ * Create the FlumeAvroManager.
+ * @param name The name of the entity to manage.
+ * @param data The data required to create the entity.
+ * @return The FlumeAvroManager.
+ */
+ public FlumeAvroManager createManager(String name, FactoryData data) {
+ try {
+
+ return new FlumeAvroManager(name, data.agents);
+ } catch (Exception ex) {
+ LOGGER.error("Could not create FlumeAvroManager", ex);
+ }
+ return null;
+ }
+ }
+
+}
Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java?rev=1226514&view=auto
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java (added)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java Mon Jan 2 18:52:50 2012
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flumeog.appender;
+
+import com.cloudera.flume.core.EventBaseImpl;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LoggingException;
+import org.apache.logging.log4j.Marker;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.helpers.UUIDUtil;
+import org.apache.logging.log4j.message.MapMessage;
+import org.apache.logging.log4j.message.Message;
+import org.apache.logging.log4j.message.StructuredDataId;
+import org.apache.logging.log4j.message.StructuredDataMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Class that is both a Flume and Log4j Event.
+ */
+public class FlumeEvent extends EventBaseImpl implements LogEvent {
+
+ private static final String DEFAULT_MDC_PREFIX = "mdc:";
+
+ private static final String DEFAULT_EVENT_PREFIX = "";
+
+ private static final String EVENT_TYPE = "eventType";
+
+ private static final String EVENT_ID = "eventId";
+
+ private static final String GUID = "guId";
+
+ private final LogEvent event;
+
+ private byte[] body;
+
+ private final String hostname;
+
+ private final Map<String, String> ctx = new HashMap<String, String>();
+
+ private final boolean compress;
+
+ /**
+ * Construct the FlumeEvent.
+ * @param event The Log4j LogEvent.
+ * @param hostname The host name.
+ * @param includes A comma separated list of MDC elements to include.
+ * @param excludes A comma separated list of MDC elements to exclude.
+ * @param required A comma separated list of MDC elements that are required to be defined.
+ * @param mdcPrefix The value to prefix to MDC keys.
+ * @param eventPrefix The value to prefix to event keys.
+ * @param compress If true the event body should be compressed.
+ */
+ public FlumeEvent(LogEvent event, String hostname, String includes, String excludes, String required,
+ String mdcPrefix, String eventPrefix, boolean compress) {
+ this.event = event;
+ this.hostname = hostname;
+ this.compress = compress;
+ if (mdcPrefix == null) {
+ mdcPrefix = DEFAULT_MDC_PREFIX;
+ }
+ if (eventPrefix == null) {
+ eventPrefix = DEFAULT_EVENT_PREFIX;
+ }
+ this.fields = new HashMap<String, byte[]>();
+ Map<String, String> mdc = event.getContextMap();
+ if (includes != null) {
+ String[] array = includes.split(",");
+ if (array.length > 0) {
+ for (String str : array) {
+ if (mdc.containsKey(str)) {
+ ctx.put(str, mdc.get(str));
+ }
+ }
+ }
+ } else if (excludes != null) {
+ String[] array = excludes.split(",");
+ if (array.length > 0) {
+ List<String> list = Arrays.asList(array);
+ for (Map.Entry<String, String> entry : mdc.entrySet()) {
+ if (!list.contains(entry.getKey())) {
+ ctx.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+
+ if (required != null) {
+ String[] array = required.split(",");
+ if (array.length > 0) {
+ for (String str : array) {
+ if (!mdc.containsKey(str)) {
+ throw new LoggingException("Required key " + str + " is missing from the MDC");
+ }
+ }
+ }
+ }
+ Message message = event.getMessage();
+ if (message instanceof MapMessage) {
+ if (message instanceof StructuredDataMessage) {
+ addStructuredData(eventPrefix, fields, (StructuredDataMessage) message);
+ }
+ addMapData(eventPrefix, fields, (MapMessage) message);
+ }
+
+ addContextData(mdcPrefix, fields, ctx);
+
+ addGuid(fields);
+ }
+
+ protected void addStructuredData(String prefix, Map<String, byte[]> fields, StructuredDataMessage msg) {
+ fields.put(prefix + EVENT_TYPE, msg.getType().getBytes());
+ StructuredDataId id = msg.getId();
+ fields.put(prefix + EVENT_ID, id.getName().getBytes());
+ }
+
+ protected void addMapData(String prefix, Map<String, byte[]> fields, MapMessage msg) {
+ Map<String, String> data = msg.getData();
+ for (Map.Entry<String, String> entry : data.entrySet()) {
+ fields.put(prefix + entry.getKey(), entry.getValue().getBytes());
+ }
+ }
+
+ protected void addContextData(String prefix, Map<String, byte[]> fields, Map<String, String> context) {
+ for (Map.Entry<String, String> entry : ctx.entrySet()) {
+ fields.put(prefix + entry.getKey(), entry.getValue().toString().getBytes());
+ }
+ }
+
+ protected void addGuid(Map<String, byte[]> fields) {
+ fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString().getBytes());
+ }
+
+ /**
+ * Set the body in the event.
+ * @param body The body to add to the event.
+ */
+ public void setBody(byte[] body) {
+ if (body == null || body.length == 0) {
+ this.body = new byte[0];
+ return;
+ }
+ if (compress) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ GZIPOutputStream os = new GZIPOutputStream(baos);
+ os.write(body);
+ os.close();
+ } catch (IOException ioe) {
+ throw new LoggingException("Unable to compress message", ioe);
+ }
+ this.body = baos.toByteArray();
+ } else {
+ this.body = body;
+ }
+ }
+
+ @Override
+ public byte[] getBody() {
+ return this.body;
+ }
+
+ @Override
+ public Priority getPriority() {
+ switch (event.getLevel()) {
+ case INFO:
+ return Priority.INFO;
+ case ERROR:
+ return Priority.ERROR;
+ case DEBUG:
+ return Priority.DEBUG;
+ case WARN:
+ return Priority.WARN;
+ case TRACE:
+ return Priority.TRACE;
+ case FATAL:
+ return Priority.FATAL;
+ }
+ return Priority.INFO;
+ }
+
+ /**
+ * Get the Frequently Qualified Class Name.
+ * @return the FQCN String.
+ */
+ public String getFQCN() {
+ return event.getFQCN();
+ }
+
+ @Override
+ public long getTimestamp() {
+ return event.getMillis();
+ }
+
+ @Override
+ public long getNanos() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public String getHost() {
+ return hostname;
+ }
+
+ /**
+ * Return the logging Level.
+ * @return the Level.
+ */
+ public Level getLevel() {
+ return event.getLevel();
+ }
+
+ /**
+ * Return the logger name.
+ * @return the logger name.
+ */
+ public String getLoggerName() {
+ return event.getLoggerName();
+ }
+
+ /**
+ * Return the StackTraceElement for the caller of the logging API.
+ * @return the StackTraceElement of the caller.
+ */
+ public StackTraceElement getSource() {
+ return event.getSource();
+ }
+
+ /**
+ * Return the Message.
+ * @return the Message.
+ */
+ public Message getMessage() {
+ return event.getMessage();
+ }
+
+ /**
+ * Return the Marker.
+ * @return the Marker.
+ */
+ public Marker getMarker() {
+ return event.getMarker();
+ }
+
+ /**
+ * Return the name of the Thread.
+ * @return the name of the Thread.
+ */
+ public String getThreadName() {
+ return event.getThreadName();
+ }
+
+ /**
+ * Return the event timestamp.
+ * @return the event timestamp.
+ */
+ public long getMillis() {
+ return event.getMillis();
+ }
+
+ /**
+ * Return the Throwable associated with the event, if any.
+ * @return the Throwable.
+ */
+ public Throwable getThrown() {
+ return event.getThrown();
+ }
+
+ /**
+ * Return a copy of the context Map.
+ * @return a copy of the context Map.
+ */
+ public Map<String, String> getContextMap() {
+ return ctx;
+ }
+
+ /**
+ * Return a copy of the context stack.
+ * @return a copy of the context stack.
+ */
+ public Stack<String> getContextStack() {
+ return event.getContextStack();
+ }
+}
Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java?rev=1226514&view=auto
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java (added)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java Mon Jan 2 18:52:50 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flumeog.appender;
+
+import org.apache.logging.log4j.core.LogEvent;
+
+/**
+ * Factory to create Flume events.
+ */
+public interface FlumeEventFactory {
+ /**
+ * Create a Flume event.
+ * @param event The Log4j LogEvent.
+ * @param hostname The name of the host.
+ * @param includes A comma separated list of MDC elements to include.
+ * @param excludes A comma separated list of MDC elements to exclude.
+ * @param required A comma separated list of MDC elements that are required.
+ * @param mdcPrefix The value to prefix to MDC keys.
+ * @param eventPrefix The value to prefix to event keys.
+ * @param compress If true the event body should be compressed.
+ * @return A FlumeEvent.
+ */
+ FlumeEvent createEvent(LogEvent event, String hostname, String includes, String excludes, String required,
+ String mdcPrefix, String eventPrefix, boolean compress);
+}
Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java?rev=1226514&view=auto
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java (added)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java Mon Jan 2 18:52:50 2012
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+/**
+ * Apache Flume Appender. Requires the user specifically include Flume and its dependencies.
+ */
+package org.apache.logging.log4j.flumeog.appender;
Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java?rev=1226514&view=auto
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java (added)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java Mon Jan 2 18:52:50 2012
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flumeog.appender;
+
+import com.cloudera.flume.core.Event;
+import com.cloudera.flume.handlers.avro.AvroEventSource;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.zip.GZIPInputStream;
+
+/**
+ *
+ */
+public class FlumeAvroAppenderTest {
+
+ private LoggerContext ctx = (LoggerContext) LogManager.getContext();
+
+ private static final String LOGBACK_CONF = "logback.configurationFile";
+ private static final String LOGBACK_CONFIG = "logback-flume.xml";
+
+ private static final int testServerPort = 12345;
+ private static final int testEventCount = 100;
+
+ private AvroEventSource eventSource;
+ private Logger avroLogger;
+
+ @BeforeClass
+ public static void setupClass() {
+ System.setProperty(LOGBACK_CONF, LOGBACK_CONFIG);
+ }
+
+ @AfterClass
+ public static void cleanupClass() {
+ System.clearProperty(LOGBACK_CONF);
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ eventSource = new AvroEventSource(testServerPort);
+ avroLogger = (Logger) LogManager.getLogger("avrologger");
+ /*
+ * Clear out all other appenders associated with this logger to ensure we're
+ * only hitting the Avro appender.
+ */
+ removeAppenders(avroLogger);
+ eventSource.open();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ removeAppenders(avroLogger);
+ eventSource.close();
+ }
+
+ @Test
+ public void testLog4jAvroAppender() throws InterruptedException, IOException {
+ Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(testServerPort))};
+ FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
+ null, null, null, null, "true", null, null, null);
+ avroAppender.start();
+ avroLogger.addAppender(avroAppender);
+ avroLogger.setLevel(Level.ALL);
+
+ Assert.assertNotNull(avroLogger);
+
+ int loggedCount = 0;
+ int receivedCount = 0;
+
+ for (int i = 0; i < testEventCount; i++) {
+ avroLogger.info("test i:" + i);
+ loggedCount++;
+ }
+
+ /*
+ * We perform this in another thread so we can put a time SLA on it by using
+ * Future#get(). Internally, the AvroEventSource uses a BlockingQueue.
+ */
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Callable<Event> callable = new Callable<Event>() {
+
+ public Event call() throws Exception {
+ return eventSource.next();
+ }
+ };
+
+ for (int i = 0; i < loggedCount; i++) {
+ try {
+ Future<Event> future = executor.submit(callable);
+
+ /*
+ * We must receive events in less than 1 second. This should be more
+ * than enough as all events should be held in AvroEventSource's
+ * BlockingQueue.
+ */
+ Event event = future.get(1, TimeUnit.SECONDS);
+
+ Assert.assertNotNull(event);
+ Assert.assertNotNull(event.getBody());
+ String body = getBody(event);
+ Assert.assertTrue(body.endsWith("test i:" + i));
+
+ receivedCount++;
+ } catch (ExecutionException e) {
+ Assert.fail("Flume failed to handle an event: " + e.getMessage());
+ break;
+ } catch (TimeoutException e) {
+ Assert
+ .fail("Flume failed to handle an event within the given time SLA: "
+ + e.getMessage());
+ break;
+ } catch (InterruptedException e) {
+ Assert
+ .fail("Flume source executor thread was interrupted. We count this as a failure.");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ executor.shutdown();
+
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ throw new IllegalStateException(
+ "Executor is refusing to shutdown cleanly");
+ }
+
+ Assert.assertEquals(loggedCount, receivedCount);
+ }
+
+ @Test
+ public void testConnectionRefused() {
+ Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(44000))};
+ FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
+ null, null, null, null, "true", null, null, null);
+ avroAppender.start();
+ avroLogger.addAppender(avroAppender);
+ avroLogger.setLevel(Level.ALL);
+
+ boolean caughtException = false;
+
+ try {
+ avroLogger.info("message 1");
+ } catch (Throwable t) {
+ //logger.debug("Logging to a non-existant server failed (as expected)", t);
+
+ caughtException = true;
+ }
+
+ Assert.assertTrue(caughtException);
+ }
+
+ @Test
+ public void testReconnect() throws IOException {
+ Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(testServerPort))};
+ FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "500", "10", "avro", "false", null,
+ null, null, null, null, "true", null, null, null);
+ avroAppender.start();
+ avroLogger.addAppender(avroAppender);
+ avroLogger.setLevel(Level.ALL);
+ avroLogger.info("message 1");
+
+ Event event = eventSource.next();
+
+ Assert.assertNotNull(event);
+ String body = getBody(event);
+ Assert.assertTrue(body.endsWith("message 1"));
+
+ eventSource.close();
+
+ Callable<Void> logCallable = new Callable<Void>() {
+
+ public Void call() throws Exception {
+ avroLogger.info("message 2");
+ return null;
+ }
+ };
+
+ ExecutorService logExecutor = Executors.newSingleThreadExecutor();
+
+ boolean caughtException = false;
+
+ try {
+ logExecutor.submit(logCallable);
+
+ Thread.sleep(1500);
+
+ eventSource.open();
+
+ logExecutor.shutdown();
+
+ if (!logExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ throw new IllegalStateException(
+ "Log executor is refusing to shutdown cleanly");
+ }
+ } catch (Throwable t) {
+ System.err.println("Failed to reestablish a connection and log to an avroSource");
+
+ caughtException = true;
+ }
+
+ Assert.assertFalse(caughtException);
+
+ event = eventSource.next();
+
+ Assert.assertNotNull(event);
+ body = getBody(event);
+ Assert.assertTrue(body.endsWith("message 2"));
+
+ caughtException = false;
+
+ try {
+ avroLogger.info("message 3");
+ } catch (Throwable t) {
+ System.err.println("Logging to a closed server failed (not expected)");
+
+ caughtException = true;
+ }
+
+ Assert.assertFalse(caughtException);
+
+ event = eventSource.next();
+
+ Assert.assertNotNull(event);
+ body = getBody(event);
+ Assert.assertTrue(body.endsWith("message 3"));
+ }
+
+
+ private void removeAppenders(Logger logger) {
+ Map<String,Appender> map = logger.getAppenders();
+ for (Map.Entry<String, Appender> entry : map.entrySet()) {
+ Appender app = entry.getValue();
+ avroLogger.removeAppender(app);
+ app.stop();
+ }
+ }
+
+ private Appender getAppender(Logger logger, String name) {
+ Map<String,Appender> map = logger.getAppenders();
+ return map.get(name);
+ }
+
+ private String getBody(Event event) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+ int n = 0;
+ while (-1 != (n = is.read())) {
+ baos.write(n);
+ }
+ return new String(baos.toByteArray());
+
+ }
+}
Modified: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml?rev=1226514&r1=1226513&r2=1226514&view=diff
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml (original)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml Mon Jan 2 18:52:50 2012
@@ -232,59 +232,53 @@
</plugin> -->
</plugins>
</build>
- <reporting>
- <plugins>
- <!-- Changes report -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-changes-plugin</artifactId>
- <version>2.6</version>
- <reportSets>
- <reportSet>
- <reports>
- <report>changes-report</report>
- <report>jira-report</report>
- </reports>
- </reportSet>
- </reportSets>
- <configuration>
- <statusIds>Resolved, Closed</statusIds>
- <columnNames>Type,Key,Summary,Assignee,Status,Resolution,Fix Version</columnNames>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-project-info-reports-plugin</artifactId>
- <version>2.4</version>
- </plugin>
- <!-- Surefire report -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-report-plugin</artifactId>
- <version>2.11</version>
- </plugin>
+ <reporting>
+ <plugins>
+ <!-- Changes report -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-changes-plugin</artifactId>
+ <version>2.6</version>
+ <reportSets>
+ <reportSet>
+ <reports>
+ <report>changes-report</report>
+ <report>jira-report</report>
+ </reports>
+ </reportSet>
+ </reportSets>
+ <configuration>
+ <statusIds>Resolved, Closed</statusIds>
+ <columnNames>Type,Key,Summary,Assignee,Status,Resolution,Fix Version</columnNames>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.4</version>
+ </plugin>
+ <!-- Surefire report -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.11</version>
+ </plugin>
- <!-- RAT report -->
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <version>0.8</version>
- <configuration>
- <excludes>
- <exclude>.idea/**/*</exclude>
- <exclude>src/test/resources/**/*</exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </reporting>
+ <!-- RAT report -->
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.8</version>
+ <configuration>
+ <excludes>
+ <exclude>.idea/**/*</exclude>
+ <exclude>src/test/resources/**/*</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
- <repositories>
- <repository>
- <id>cloudera</id>
- <url>https://repository.cloudera.com/content/repositories/releases/</url>
- </repository>
- </repositories>
<distributionManagement>
<repository>
<id>apache.releases.https</id>
@@ -308,4 +302,25 @@
<module>slf4j-impl</module>
<module>log4j2-jcl</module>
</modules>
+ <profiles>
+ <profile>
+ <id>include-flume</id>
+ <modules>
+ <module>log4j2-flume-og</module>
+ <module>log4j2-flume-ng</module>
+ </modules>
+ </profile>
+ <profile>
+ <id>include-flume-ng</id>
+ <modules>
+ <module>log4j2-flume-ng</module>
+ </modules>
+ </profile>
+ <profile>
+ <id>include-flume-og</id>
+ <modules>
+ <module>log4j2-flume-og</module>
+ </modules>
+ </profile>
+ </profiles>
</project>