You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2012/01/02 19:52:52 UTC

svn commit: r1226514 [2/2] - in /logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers: ./ log4j2-core/ log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/ log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/ log4...

Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java?rev=1226514&view=auto
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java (added)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java Mon Jan  2 18:52:50 2012
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flumeog.appender;
+
+import com.cloudera.flume.handlers.avro.AvroFlumeEvent;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+
+import com.cloudera.flume.handlers.avro.FlumeEventAvroServer;
+import com.cloudera.flume.handlers.avro.AvroEventConvertUtil;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ * Manager for FlumeAvroAppenders.
+ */
+public class FlumeAvroManager extends AbstractManager {
+
+    /**
+      The default reconnection delay (500 milliseconds or .5 seconds).
+     */
+    public static final int DEFAULT_RECONNECTION_DELAY   = 500;
+
+    private static final int DEFAULT_RECONNECTS = 3;
+
+    private static ManagerFactory factory = new AvroManagerFactory();
+
+    private FlumeEventAvroServer client;
+
+    private final Agent[] agents;
+
+    private int current = 0;
+
+    protected FlumeAvroManager(String name, Agent[] agents) {
+        super(name);
+        this.agents = agents;
+        this.client = connect(agents);
+    }
+
+    /**
+     * Return a FlumeAvroManager.
+     * @param agents The agents to use.
+     * @return A FlumeAvroManager.
+     */
+    public static FlumeAvroManager getManager(Agent[] agents) {
+        if (agents == null || agents.length == 0) {
+            throw new IllegalArgumentException("At least one agent is required");
+        }
+
+        StringBuilder sb = new StringBuilder("FlumeAvro[");
+        boolean first = true;
+        for (Agent agent : agents) {
+            if (!first) {
+                sb.append(",");
+            }
+            sb.append(agent.getHost()).append(":").append(agent.getPort());
+            first = false;
+        }
+        sb.append("]");
+        return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(agents));
+    }
+
+    /**
+     * Return the agents.
+     * @return The agent array.
+     */
+    public Agent[] getAgents() {
+        return agents;
+    }
+
+    /**
+     * Returns the index of the current agent.
+     * @return The index for the current agent.
+     */
+    public int getCurrent() {
+        return current;
+    }
+
+    protected synchronized void send(FlumeEvent event, int delay, int retries)  {
+        if (delay == 0) {
+            delay = DEFAULT_RECONNECTION_DELAY;
+        }
+        if (retries == 0) {
+            retries = DEFAULT_RECONNECTS;
+        }
+        AvroFlumeEvent avroEvent = AvroEventConvertUtil.toAvroEvent(event);
+        int i = 0;
+
+        String msg = "Error writing to " + getName();
+
+        do {
+            try {
+                client.append(avroEvent);
+                return;
+            } catch (Exception ex) {
+                if (i == retries - 1) {
+                    msg = "Error writing to " + getName() + " at " + agents[0].getHost() + ":" + agents[0].getPort();
+                    LOGGER.warn(msg, ex);
+                    break;
+                }
+                sleep(delay);
+            }
+        } while (++i < retries);
+
+        for (int index = 0; index < agents.length; ++index) {
+            if (index == current) {
+                continue;
+            }
+            Agent agent = agents[index];
+            i = 0;
+            do {
+                try {
+
+                    FlumeEventAvroServer c = connect(agent.getHost(), agent.getPort());
+                    c.append(avroEvent);
+                    client = c;
+                    current = i;
+                    return;
+                } catch (Exception ex) {
+                    if (i == retries - 1) {
+                        String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
+                            agent.getPort();
+                        LOGGER.warn(warnMsg, ex);
+                        break;
+                    }
+                    sleep(delay);
+                }
+            } while (++i < retries);
+        }
+
+        throw new AppenderRuntimeException(msg);
+
+    }
+
+    private void sleep(int delay) {
+        try {
+            Thread.sleep(delay);
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * There is a very good chance that this will always return the first agent even if it isn't available.
+     * @param agents The list of agents to choose from
+     * @return The FlumeEventAvroServer.
+     */
+    private FlumeEventAvroServer connect(Agent[] agents) {
+        int i = 0;
+        for (Agent agent : agents) {
+            FlumeEventAvroServer server = connect(agent.getHost(), agent.getPort());
+            if (server != null) {
+                current = i;
+                return server;
+            }
+            ++i;
+        }
+        throw new AppenderRuntimeException("Unable to connect to any agents");
+    }
+
+    private FlumeEventAvroServer connect(String hostname, int port) {
+        URL url;
+
+        try {
+            url = new URL("http", hostname, port, "/");
+        } catch (MalformedURLException ex) {
+            LOGGER.error("Unable to create a URL for hostname " + hostname + " at port " + port, ex);
+            return null;
+        }
+
+        try {
+            return SpecificRequestor.getClient(FlumeEventAvroServer.class, new HttpTransceiver(url));
+        } catch (IOException ioe) {
+            LOGGER.error("Unable to create Avro client");
+            return null;
+        }
+    }
+
+    /**
+     * Factory data.
+     */
+    private static class FactoryData {
+        private Agent[] agents;
+
+        /**
+         * Constructor.
+         * @param agents The agents.
+         */
+        public FactoryData(Agent[] agents) {
+            this.agents = agents;
+        }
+    }
+
+    /**
+     * Avro Manager Factory.
+     */
+    private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
+
+        /**
+         * Create the FlumeAvroManager.
+         * @param name The name of the entity to manage.
+         * @param data The data required to create the entity.
+         * @return The FlumeAvroManager.
+         */
+        public FlumeAvroManager createManager(String name, FactoryData data) {
+            try {
+
+                return new FlumeAvroManager(name, data.agents);
+            } catch (Exception ex) {
+                LOGGER.error("Could not create FlumeAvroManager", ex);
+            }
+            return null;
+        }
+    }
+
+}

Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java?rev=1226514&view=auto
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java (added)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java Mon Jan  2 18:52:50 2012
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flumeog.appender;
+
+import com.cloudera.flume.core.EventBaseImpl;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LoggingException;
+import org.apache.logging.log4j.Marker;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.helpers.UUIDUtil;
+import org.apache.logging.log4j.message.MapMessage;
+import org.apache.logging.log4j.message.Message;
+import org.apache.logging.log4j.message.StructuredDataId;
+import org.apache.logging.log4j.message.StructuredDataMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Class that is both a Flume and Log4j Event.
+ */
+public class FlumeEvent extends EventBaseImpl implements LogEvent {
+
+    private static final String DEFAULT_MDC_PREFIX = "mdc:";
+
+    private static final String DEFAULT_EVENT_PREFIX = "";
+
+    private static final String EVENT_TYPE = "eventType";
+
+    private static final String EVENT_ID = "eventId";
+
+    private static final String GUID = "guId";
+
+    private final LogEvent event;
+
+    private byte[] body;
+
+    private final String hostname;
+
+    private final Map<String, String> ctx = new HashMap<String, String>();
+
+    private final boolean compress;
+
+    /**
+     * Construct the FlumeEvent.
+     * @param event The Log4j LogEvent.
+     * @param hostname The host name.
+     * @param includes A comma separated list of MDC elements to include.
+     * @param excludes A comma separated list of MDC elements to exclude.
+     * @param required A comma separated list of MDC elements that are required to be defined.
+     * @param mdcPrefix The value to prefix to MDC keys.
+     * @param eventPrefix The value to prefix to event keys.
+     * @param compress If true the event body should be compressed.
+     */
+    public FlumeEvent(LogEvent event, String hostname, String includes, String excludes, String required,
+                      String mdcPrefix, String eventPrefix, boolean compress) {
+        this.event = event;
+        this.hostname = hostname;
+        this.compress = compress;
+        if (mdcPrefix == null) {
+            mdcPrefix = DEFAULT_MDC_PREFIX;
+        }
+        if (eventPrefix == null) {
+            eventPrefix = DEFAULT_EVENT_PREFIX;
+        }
+        this.fields = new HashMap<String, byte[]>();
+        Map<String, String> mdc = event.getContextMap();
+        if (includes != null) {
+            String[] array = includes.split(",");
+            if (array.length > 0) {
+                for (String str : array) {
+                    if (mdc.containsKey(str)) {
+                        ctx.put(str, mdc.get(str));
+                    }
+                }
+            }
+        } else if (excludes != null) {
+            String[] array = excludes.split(",");
+            if (array.length > 0) {
+                List<String> list = Arrays.asList(array);
+                for (Map.Entry<String, String> entry : mdc.entrySet()) {
+                    if (!list.contains(entry.getKey())) {
+                        ctx.put(entry.getKey(), entry.getValue());
+                    }
+                }
+            }
+        }
+
+        if (required != null) {
+            String[] array = required.split(",");
+            if (array.length > 0) {
+                for (String str : array) {
+                    if (!mdc.containsKey(str)) {
+                        throw new LoggingException("Required key " + str + " is missing from the MDC");
+                    }
+                }
+            }
+        }
+        Message message = event.getMessage();
+        if (message instanceof MapMessage) {
+            if (message instanceof StructuredDataMessage) {
+                addStructuredData(eventPrefix, fields, (StructuredDataMessage) message);
+            }
+            addMapData(eventPrefix, fields, (MapMessage) message);
+        }
+
+        addContextData(mdcPrefix, fields, ctx);
+
+        addGuid(fields);
+    }
+
+    protected void addStructuredData(String prefix, Map<String, byte[]> fields, StructuredDataMessage msg) {
+        fields.put(prefix + EVENT_TYPE, msg.getType().getBytes());
+        StructuredDataId id = msg.getId();
+        fields.put(prefix + EVENT_ID, id.getName().getBytes());
+    }
+
+    protected void addMapData(String prefix, Map<String, byte[]> fields, MapMessage msg) {
+        Map<String, String> data = msg.getData();
+        for (Map.Entry<String, String> entry : data.entrySet()) {
+            fields.put(prefix + entry.getKey(), entry.getValue().getBytes());
+        }
+    }
+
+    protected void addContextData(String prefix, Map<String, byte[]> fields, Map<String, String> context) {
+        for (Map.Entry<String, String> entry : ctx.entrySet()) {
+            fields.put(prefix + entry.getKey(), entry.getValue().toString().getBytes());
+        }
+    }
+
+    protected void addGuid(Map<String, byte[]> fields) {
+        fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString().getBytes());
+    }
+
+    /**
+     * Set the body in the event.
+     * @param body The body to add to the event.
+     */
+    public void setBody(byte[] body) {
+        if (body == null || body.length == 0) {
+            this.body = new byte[0];
+            return;
+        }
+        if (compress) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            try {
+                GZIPOutputStream os = new GZIPOutputStream(baos);
+                os.write(body);
+                os.close();
+            } catch (IOException ioe) {
+                throw new LoggingException("Unable to compress message", ioe);
+            }
+            this.body = baos.toByteArray();
+        } else {
+            this.body = body;
+        }
+    }
+
+    @Override
+    public byte[] getBody() {
+        return this.body;
+    }
+
+    @Override
+    public Priority getPriority() {
+        switch (event.getLevel()) {
+            case INFO:
+                return Priority.INFO;
+            case ERROR:
+                return Priority.ERROR;
+            case DEBUG:
+                return Priority.DEBUG;
+            case WARN:
+                return Priority.WARN;
+            case TRACE:
+                return Priority.TRACE;
+            case FATAL:
+                return Priority.FATAL;
+        }
+        return Priority.INFO;
+    }
+
+    /**
+     * Get the Frequently Qualified Class Name.
+     * @return the FQCN String.
+     */
+    public String getFQCN() {
+        return event.getFQCN();
+    }
+
+    @Override
+    public long getTimestamp() {
+        return event.getMillis();
+    }
+
+    @Override
+    public long getNanos() {
+        return System.nanoTime();
+    }
+
+    @Override
+    public String getHost() {
+        return hostname;
+    }
+
+    /**
+     * Return the logging Level.
+     * @return the Level.
+     */
+    public Level getLevel() {
+        return event.getLevel();
+    }
+
+    /**
+     * Return the logger name.
+     * @return the logger name.
+     */
+    public String getLoggerName() {
+        return event.getLoggerName();
+    }
+
+    /**
+     * Return the StackTraceElement for the caller of the logging API.
+     * @return the StackTraceElement of the caller.
+     */
+    public StackTraceElement getSource() {
+        return event.getSource();
+    }
+
+    /**
+     * Return the Message.
+     * @return the Message.
+     */
+    public Message getMessage() {
+        return event.getMessage();
+    }
+
+    /**
+     * Return the Marker.
+     * @return the Marker.
+     */
+    public Marker getMarker() {
+        return event.getMarker();
+    }
+
+    /**
+     * Return the name of the Thread.
+     * @return the name of the Thread.
+     */
+    public String getThreadName() {
+        return event.getThreadName();
+    }
+
+    /**
+     * Return the event timestamp.
+     * @return the event timestamp.
+     */
+    public long getMillis() {
+        return event.getMillis();
+    }
+
+    /**
+     * Return the Throwable associated with the event, if any.
+     * @return the Throwable.
+     */
+    public Throwable getThrown() {
+        return event.getThrown();
+    }
+
+    /**
+     * Return a copy of the context Map.
+     * @return a copy of the context Map.
+     */
+    public Map<String, String> getContextMap() {
+        return ctx;
+    }
+
+    /**
+     * Return a copy of the context stack.
+     * @return a copy of the context stack.
+     */
+    public Stack<String> getContextStack() {
+        return event.getContextStack();
+    }
+}

Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java?rev=1226514&view=auto
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java (added)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java Mon Jan  2 18:52:50 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flumeog.appender;
+
+import org.apache.logging.log4j.core.LogEvent;
+
+/**
+ * Factory to create Flume events.
+ */
+public interface FlumeEventFactory {
+    /**
+     * Create a Flume event.
+     * @param event The Log4j LogEvent.
+     * @param hostname The name of the host.
+     * @param includes A comma separated list of MDC elements to include.
+     * @param excludes A comma separated list of MDC elements to exclude.
+     * @param required A comma separated list of MDC elements that are required.
+     * @param mdcPrefix The value to prefix to MDC keys.
+     * @param eventPrefix The value to prefix to event keys.
+     * @param compress If true the event body should be compressed.
+     * @return A FlumeEvent.
+     */
+    FlumeEvent createEvent(LogEvent event, String hostname, String includes, String excludes, String required,
+                      String mdcPrefix, String eventPrefix, boolean compress);
+}

Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java?rev=1226514&view=auto
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java (added)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java Mon Jan  2 18:52:50 2012
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+/**
+ * Apache Flume Appender. Requires the user specifically include Flume and its dependencies.
+ */
+package org.apache.logging.log4j.flumeog.appender;

Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java?rev=1226514&view=auto
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java (added)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java Mon Jan  2 18:52:50 2012
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flumeog.appender;
+
+import com.cloudera.flume.core.Event;
+import com.cloudera.flume.handlers.avro.AvroEventSource;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.zip.GZIPInputStream;
+
+/**
+ *
+ */
+public class FlumeAvroAppenderTest {
+
+    private LoggerContext ctx = (LoggerContext) LogManager.getContext();
+
+    private static final String LOGBACK_CONF = "logback.configurationFile";
+    private static final String LOGBACK_CONFIG = "logback-flume.xml";
+
+    private static final int testServerPort = 12345;
+    private static final int testEventCount = 100;
+
+    private AvroEventSource eventSource;
+    private Logger avroLogger;
+
+    @BeforeClass
+    public static void setupClass() {
+        System.setProperty(LOGBACK_CONF, LOGBACK_CONFIG);
+    }
+
+    @AfterClass
+    public static void cleanupClass() {
+        System.clearProperty(LOGBACK_CONF);
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        eventSource = new AvroEventSource(testServerPort);
+        avroLogger = (Logger) LogManager.getLogger("avrologger");
+        /*
+        * Clear out all other appenders associated with this logger to ensure we're
+        * only hitting the Avro appender.
+        */
+        removeAppenders(avroLogger);
+        eventSource.open();
+    }
+
+    @After
+    public void teardown() throws IOException {
+        removeAppenders(avroLogger);
+        eventSource.close();
+    }
+
+    @Test
+    public void testLog4jAvroAppender() throws InterruptedException, IOException {
+        Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(testServerPort))};
+        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
+            null, null, null, null, "true", null, null, null);
+        avroAppender.start();
+        avroLogger.addAppender(avroAppender);
+        avroLogger.setLevel(Level.ALL);
+
+        Assert.assertNotNull(avroLogger);
+
+        int loggedCount = 0;
+        int receivedCount = 0;
+
+        for (int i = 0; i < testEventCount; i++) {
+            avroLogger.info("test i:" + i);
+            loggedCount++;
+        }
+
+        /*
+        * We perform this in another thread so we can put a time SLA on it by using
+        * Future#get(). Internally, the AvroEventSource uses a BlockingQueue.
+        */
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Callable<Event> callable = new Callable<Event>() {
+
+            public Event call() throws Exception {
+                return eventSource.next();
+            }
+        };
+
+        for (int i = 0; i < loggedCount; i++) {
+            try {
+                Future<Event> future = executor.submit(callable);
+
+                /*
+                * We must receive events in less than 1 second. This should be more
+                * than enough as all events should be held in AvroEventSource's
+                * BlockingQueue.
+                */
+                Event event = future.get(1, TimeUnit.SECONDS);
+
+                Assert.assertNotNull(event);
+                Assert.assertNotNull(event.getBody());
+                String body = getBody(event);
+                Assert.assertTrue(body.endsWith("test i:" + i));
+
+                receivedCount++;
+            } catch (ExecutionException e) {
+                Assert.fail("Flume failed to handle an event: " + e.getMessage());
+                break;
+            } catch (TimeoutException e) {
+                Assert
+                    .fail("Flume failed to handle an event within the given time SLA: "
+                        + e.getMessage());
+                break;
+            } catch (InterruptedException e) {
+                Assert
+                    .fail("Flume source executor thread was interrupted. We count this as a failure.");
+                Thread.currentThread().interrupt();
+                break;
+            }
+        }
+
+        executor.shutdown();
+
+        if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+            throw new IllegalStateException(
+                "Executor is refusing to shutdown cleanly");
+        }
+
+        Assert.assertEquals(loggedCount, receivedCount);
+    }
+
+    @Test
+    public void testConnectionRefused() {
+        Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(44000))};
+        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
+            null, null, null, null, "true", null, null, null);
+        avroAppender.start();
+        avroLogger.addAppender(avroAppender);
+        avroLogger.setLevel(Level.ALL);
+
+        boolean caughtException = false;
+
+        try {
+            avroLogger.info("message 1");
+        } catch (Throwable t) {
+            //logger.debug("Logging to a non-existant server failed (as expected)", t);
+
+            caughtException = true;
+        }
+
+        Assert.assertTrue(caughtException);
+    }
+
+    @Test
+    public void testReconnect() throws IOException {
+        Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(testServerPort))};
+        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "500", "10", "avro", "false", null,
+            null, null, null, null, "true", null, null, null);
+        avroAppender.start();
+        avroLogger.addAppender(avroAppender);
+        avroLogger.setLevel(Level.ALL);
+        avroLogger.info("message 1");
+
+        Event event = eventSource.next();
+
+        Assert.assertNotNull(event);
+        String body = getBody(event);
+        Assert.assertTrue(body.endsWith("message 1"));
+
+        eventSource.close();
+
+        Callable<Void> logCallable = new Callable<Void>() {
+
+            public Void call() throws Exception {
+                avroLogger.info("message 2");
+                return null;
+            }
+        };
+
+        ExecutorService logExecutor = Executors.newSingleThreadExecutor();
+
+        boolean caughtException = false;
+
+        try {
+            logExecutor.submit(logCallable);
+
+            Thread.sleep(1500);
+
+            eventSource.open();
+
+            logExecutor.shutdown();
+
+            if (!logExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+                throw new IllegalStateException(
+                    "Log executor is refusing to shutdown cleanly");
+            }
+        } catch (Throwable t) {
+            System.err.println("Failed to reestablish a connection and log to an avroSource");
+
+            caughtException = true;
+        }
+
+        Assert.assertFalse(caughtException);
+
+        event = eventSource.next();
+
+        Assert.assertNotNull(event);
+        body = getBody(event);
+        Assert.assertTrue(body.endsWith("message 2"));
+
+        caughtException = false;
+
+        try {
+            avroLogger.info("message 3");
+        } catch (Throwable t) {
+            System.err.println("Logging to a closed server failed (not expected)");
+
+            caughtException = true;
+        }
+
+        Assert.assertFalse(caughtException);
+
+        event = eventSource.next();
+
+        Assert.assertNotNull(event);
+        body = getBody(event);
+        Assert.assertTrue(body.endsWith("message 3"));
+    }
+
+
+    private void removeAppenders(Logger logger) {
+        Map<String,Appender> map = logger.getAppenders();
+        for (Map.Entry<String, Appender> entry : map.entrySet()) {
+            Appender app = entry.getValue();
+            avroLogger.removeAppender(app);
+            app.stop();
+        }
+    }
+
+    private Appender getAppender(Logger logger, String name) {
+        Map<String,Appender> map = logger.getAppenders();
+        return map.get(name);
+    }
+
+    private String getBody(Event event) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+            int n = 0;
+            while (-1 != (n = is.read())) {
+                baos.write(n);
+            }
+            return new String(baos.toByteArray());
+
+    }
+}

Modified: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml?rev=1226514&r1=1226513&r2=1226514&view=diff
==============================================================================
--- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml (original)
+++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml Mon Jan  2 18:52:50 2012
@@ -232,59 +232,53 @@
       </plugin> -->
     </plugins>
   </build>
-    <reporting>
-      <plugins>
-        <!-- Changes report -->
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-changes-plugin</artifactId>
-          <version>2.6</version>
-          <reportSets>
-            <reportSet>
-              <reports>
-                <report>changes-report</report>
-                <report>jira-report</report>
-              </reports>
-            </reportSet>
-          </reportSets>
-          <configuration>
-            <statusIds>Resolved, Closed</statusIds>
-            <columnNames>Type,Key,Summary,Assignee,Status,Resolution,Fix Version</columnNames>
-          </configuration>
-        </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-project-info-reports-plugin</artifactId>
-          <version>2.4</version>
-        </plugin>
-        <!-- Surefire report -->
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-surefire-report-plugin</artifactId>
-          <version>2.11</version>
-        </plugin>
+  <reporting>
+    <plugins>
+      <!-- Changes report -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-changes-plugin</artifactId>
+        <version>2.6</version>
+        <reportSets>
+          <reportSet>
+            <reports>
+              <report>changes-report</report>
+              <report>jira-report</report>
+            </reports>
+          </reportSet>
+        </reportSets>
+        <configuration>
+          <statusIds>Resolved, Closed</statusIds>
+          <columnNames>Type,Key,Summary,Assignee,Status,Resolution,Fix Version</columnNames>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-project-info-reports-plugin</artifactId>
+        <version>2.4</version>
+      </plugin>
+      <!-- Surefire report -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.11</version>
+      </plugin>
 
-        <!-- RAT report -->
-        <plugin>
-          <groupId>org.apache.rat</groupId>
-          <artifactId>apache-rat-plugin</artifactId>
-          <version>0.8</version>
-          <configuration>
-            <excludes>
-              <exclude>.idea/**/*</exclude>
-              <exclude>src/test/resources/**/*</exclude>
-            </excludes>
-          </configuration>
-        </plugin>
-      </plugins>
-    </reporting>
+      <!-- RAT report -->
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <version>0.8</version>
+        <configuration>
+          <excludes>
+            <exclude>.idea/**/*</exclude>
+            <exclude>src/test/resources/**/*</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+    </plugins>
+  </reporting>
 
-  <repositories>
-    <repository>
-      <id>cloudera</id>
-      <url>https://repository.cloudera.com/content/repositories/releases/</url>
-    </repository>
-  </repositories>
   <distributionManagement>
      <repository>
       <id>apache.releases.https</id>
@@ -308,4 +302,25 @@
     <module>slf4j-impl</module>
     <module>log4j2-jcl</module>
   </modules>
+  <profiles>
+    <profile>
+      <id>include-flume</id>
+      <modules>
+        <module>log4j2-flume-og</module>
+        <module>log4j2-flume-ng</module>
+      </modules>
+    </profile>
+    <profile>
+      <id>include-flume-ng</id>
+      <modules>
+        <module>log4j2-flume-ng</module>
+      </modules>
+    </profile>
+    <profile>
+      <id>include-flume-og</id>
+      <modules>
+        <module>log4j2-flume-og</module>
+      </modules>
+    </profile>
+  </profiles>
 </project>