You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by gg...@apache.org on 2015/09/10 01:01:37 UTC

logging-log4j2 git commit: [LOG4J2-1113] New publisher Appender for ZeroMQ (using JeroMQ).

Repository: logging-log4j2
Updated Branches:
  refs/heads/master ce1668592 -> 80c9dcbea


[LOG4J2-1113] New publisher Appender for ZeroMQ (using JeroMQ).

Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/80c9dcbe
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/80c9dcbe
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/80c9dcbe

Branch: refs/heads/master
Commit: 80c9dcbeaed4976649ad5398fff2d3b1540d9cb7
Parents: ce16685
Author: ggregory <gg...@apache.org>
Authored: Wed Sep 9 16:01:27 2015 -0700
Committer: ggregory <gg...@apache.org>
Committed: Wed Sep 9 16:01:27 2015 -0700

----------------------------------------------------------------------
 log4j-core/pom.xml                              |   6 +
 .../appender/mom/jeromq/JeroMqAppender.java     | 350 +++++++++++++++++++
 .../appender/mom/jeromq/JeroMqAppenderTest.java | 130 +++++++
 .../appender/mom/jeromq/JeroMqTestClient.java   |  55 +++
 .../logging/log4j/junit/LoggerContextRule.java  |  25 ++
 .../src/test/resources/JeroMqAppenderTest.xml   |  30 ++
 pom.xml                                         |   5 +
 src/changes/changes.xml                         |   3 +
 src/site/site.xml                               |   1 +
 src/site/xdoc/manual/appenders.xml              | 161 +++++++++
 src/site/xdoc/runtime-dependencies.xml          |   8 +
 11 files changed, 774 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/pom.xml
----------------------------------------------------------------------
diff --git a/log4j-core/pom.xml b/log4j-core/pom.xml
index 1c00fbc..f8193e8 100644
--- a/log4j-core/pom.xml
+++ b/log4j-core/pom.xml
@@ -114,6 +114,12 @@
       <artifactId>kafka-clients</artifactId>
       <optional>true</optional>
     </dependency>
+    <!-- Used for ZeroMQ JeroMQ appender -->
+    <dependency>
+      <groupId>org.zeromq</groupId>
+      <artifactId>jeromq</artifactId>
+      <optional>true</optional>
+    </dependency>
     <!-- Used for compressing to formats other than zip and gz -->
     <dependency>
       <groupId>org.apache.commons</groupId>

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
new file mode 100644
index 0000000..90ed8f5
--- /dev/null
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.jeromq;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.apache.logging.log4j.status.StatusLogger;
+import org.apache.logging.log4j.util.PropertiesUtil;
+import org.apache.logging.log4j.util.Strings;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.Socket;
+
+/**
+ * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
+ * <p>
+ * Requires the JeroMQ jar (LGPL as of 0.3.5)
+ * </p>
+ */
+// TODO
+// Some methods are synchronized because a ZMQ.Socket is not thread-safe
+// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
+// some issue on threads owning certain resources as opposed to others.
+@Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
+public final class JeroMqAppender extends AbstractAppender {
+
+    // Per ZMQ docs, there should usually only be one ZMQ context per process.
+    private static volatile ZMQ.Context context;
+
+    private static Logger logger;
+
+    // ZMQ sockets are not thread safe.
+    private static ZMQ.Socket publisher;
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
+
+    static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
+
+    static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
+
+    static {
+        logger = StatusLogger.getLogger();
+        final PropertiesUtil managerProps = PropertiesUtil.getProperties();
+        final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
+        final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
+        final String simpleName = SIMPLE_NAME;
+        logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
+        logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
+        context = ZMQ.context(ioThreads);
+        logger.trace("{} created ZMQ context {}", simpleName, context);
+        if (enableShutdownHook) {
+            final Thread hook = new Thread(simpleName + "-shutdown") {
+                @Override
+                public void run() {
+                    shutdown();
+                }
+            };
+            logger.trace("{} adding shutdown hook {}", simpleName, hook);
+            Runtime.getRuntime().addShutdownHook(hook);
+        }
+    }
+
+    // The ZMQ.Socket class has other set methods that we do not cover because
+    // they throw unsupported operation exceptions.
+    @PluginFactory
+    public static JeroMqAppender createAppender(
+            // @formatter:off
+            @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
+            @PluginElement("Layout") Layout<?> layout,
+            @PluginElement("Filters") final Filter filter,
+            @PluginElement("Properties") final Property[] properties,
+            // Super attributes
+            @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
+            // ZMQ attributes; defaults picked from zmq.Options.
+            @PluginAttribute(value="affinity", defaultLong=0) final long affinity,
+            @PluginAttribute(value="backlog", defaultLong=100) final long backlog,
+            @PluginAttribute(value="delayAttachOnConnect", defaultBoolean=false) final boolean delayAttachOnConnect,
+            @PluginAttribute(value="identity") final byte[] identity,
+            @PluginAttribute(value="ipv4Only", defaultBoolean=true) final boolean ipv4Only,
+            @PluginAttribute(value="linger", defaultLong=-1) final long linger,
+            @PluginAttribute(value="maxMsgSize", defaultLong=-1) final long maxMsgSize,
+            @PluginAttribute(value="rcvHwm", defaultLong=1000) final long rcvHwm,
+            @PluginAttribute(value="receiveBufferSize", defaultLong=0) final long receiveBufferSize,
+            @PluginAttribute(value="receiveTimeOut", defaultLong=-1) final int receiveTimeOut,
+            @PluginAttribute(value="reconnectIVL", defaultLong=100) final long reconnectIVL,
+            @PluginAttribute(value="reconnectIVLMax", defaultLong=0) final long reconnectIVLMax,
+            @PluginAttribute(value="sendBufferSize", defaultLong=0) final long sendBufferSize,
+            @PluginAttribute(value="sendTimeOut", defaultLong=-1) final int sendTimeOut,
+            @PluginAttribute(value="sndHwm", defaultLong=1000) final long sndHwm,
+            @PluginAttribute(value="tcpKeepAlive", defaultInt=-1) final int tcpKeepAlive,
+            @PluginAttribute(value="tcpKeepAliveCount", defaultLong=-1) final long tcpKeepAliveCount,
+            @PluginAttribute(value="tcpKeepAliveIdle", defaultLong=-1) final long tcpKeepAliveIdle,
+            @PluginAttribute(value="tcpKeepAliveInterval", defaultLong=-1) final long tcpKeepAliveInterval,
+            @PluginAttribute(value="xpubVerbose", defaultBoolean=false) final boolean xpubVerbose            
+            // @formatter:on
+    ) {
+        if (layout == null) {
+            layout = PatternLayout.createDefaultLayout();
+        }
+        List<String> endpoints;
+        if (properties == null) {
+            endpoints = new ArrayList<>(0);
+        } else {
+            endpoints = new ArrayList<>(properties.length);
+            for (final Property property : properties) {
+                if ("endpoint".equalsIgnoreCase(property.getName())) {
+                    final String value = property.getValue();
+                    if (Strings.isNotEmpty(value)) {
+                        endpoints.add(value);
+                    }
+                }
+            }
+        }
+        logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
+                name, filter, layout, ignoreExceptions, endpoints);
+        return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
+                delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut,
+                reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, tcpKeepAliveCount,
+                tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
+    }
+
+    static ZMQ.Context getContext() {
+        return context;
+    }
+
+    private static ZMQ.Socket getPublisher() {
+        return publisher;
+    }
+
+    private static ZMQ.Socket newPublisher() {
+        logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
+        final Socket socketPub = context.socket(ZMQ.PUB);
+        logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
+        return socketPub;
+    }
+
+    static void shutdown() {
+        if (context != null) {
+            logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
+            context.term();
+            context = null;
+        }
+    }
+
+    private final long affinity;
+    private final long backlog;
+    private final boolean delayAttachOnConnect;
+    private final List<String> endpoints;
+    private final byte[] identity;
+    private final int ioThreads = 1;
+    private final boolean ipv4Only;
+    private final long linger;
+    private final long maxMsgSize;
+    private final long rcvHwm;
+    private final long receiveBufferSize;
+    private final int receiveTimeOut;
+    private final long reconnectIVL;
+    private final long reconnectIVLMax;
+    private final long sendBufferSize;
+    private int sendRcFalse;
+    private int sendRcTrue;
+    private final int sendTimeOut;
+    private final long sndHwm;
+    private final int tcpKeepAlive;
+    private final long tcpKeepAliveCount;
+    private final long tcpKeepAliveIdle;
+    private final long tcpKeepAliveInterval;
+    private final boolean xpubVerbose;
+
+    private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
+            final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
+            final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
+            final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
+            final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
+            final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
+            final long tcpKeepAliveInterval, final boolean xpubVerbose) {
+        super(name, filter, layout, ignoreExceptions);
+        this.endpoints = endpoints;
+        this.affinity = affinity;
+        this.backlog = backlog;
+        this.delayAttachOnConnect = delayAttachOnConnect;
+        this.identity = identity;
+        this.ipv4Only = ipv4Only;
+        this.linger = linger;
+        this.maxMsgSize = maxMsgSize;
+        this.rcvHwm = rcvHwm;
+        this.receiveBufferSize = receiveBufferSize;
+        this.receiveTimeOut = receiveTimeOut;
+        this.reconnectIVL = reconnectIVL;
+        this.reconnectIVLMax = reconnectIVLMax;
+        this.sendBufferSize = sendBufferSize;
+        this.sendTimeOut = sendTimeOut;
+        this.sndHwm = sndHWM;
+        this.tcpKeepAlive = tcpKeepAlive;
+        this.tcpKeepAliveCount = tcpKeepAliveCount;
+        this.tcpKeepAliveIdle = tcpKeepAliveIdle;
+        this.tcpKeepAliveInterval = tcpKeepAliveInterval;
+        this.xpubVerbose = xpubVerbose;
+    }
+
+    @Override
+    public synchronized void append(final LogEvent event) {
+        final String formattedMessage = event.getMessage().getFormattedMessage();
+        if (getPublisher().send(formattedMessage, 0)) {
+            sendRcTrue++;
+        } else {
+            sendRcFalse++;
+            logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse,
+                    formattedMessage);
+        }
+    }
+
+    // not public, handy for testing
+    int getSendRcFalse() {
+        return sendRcFalse;
+    }
+
+    // not public, handy for testing
+    int getSendRcTrue() {
+        return sendRcTrue;
+    }
+
+    // not public, handy for testing
+    void resetSendRcs() {
+        sendRcTrue = sendRcFalse = 0;
+    }
+
+    @Override
+    public synchronized void start() {
+        super.start();
+        publisher = newPublisher();
+        final String name = getName();
+        final String prefix = "JeroMQ Appender";
+        logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
+        logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
+        //
+        final ZMQ.Socket socketPub = getPublisher();
+        logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name,
+                socketPub.getClass().getName(), socketPub);
+        logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
+        socketPub.setAffinity(affinity);
+        logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
+        socketPub.setBacklog(backlog);
+        logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
+        socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
+        if (identity != null) {
+            logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
+            socketPub.setIdentity(identity);
+        }
+        logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
+        socketPub.setIPv4Only(ipv4Only);
+        logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
+        socketPub.setLinger(linger);
+        logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
+        socketPub.setMaxMsgSize(maxMsgSize);
+        logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
+        socketPub.setRcvHWM(rcvHwm);
+        logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
+        socketPub.setReceiveBufferSize(receiveBufferSize);
+        logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
+        socketPub.setReceiveTimeOut(receiveTimeOut);
+        logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
+        socketPub.setReconnectIVL(reconnectIVL);
+        logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
+        socketPub.setReconnectIVLMax(reconnectIVLMax);
+        logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
+        socketPub.setSendBufferSize(sendBufferSize);
+        logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
+        socketPub.setSendTimeOut(sendTimeOut);
+        logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
+        socketPub.setSndHWM(sndHwm);
+        logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
+        socketPub.setTCPKeepAlive(tcpKeepAlive);
+        logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
+        socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
+        logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
+        socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
+        logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
+        socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
+        logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
+        socketPub.setXpubVerbose(xpubVerbose);
+        //
+        if (logger.isDebugEnabled()) {
+            logger.debug(
+                    "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
+                            + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
+                            + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
+                    name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
+                    socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
+                    socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(), socketPub.getRate(),
+                    socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(), socketPub.getReceiveTimeOut(),
+                    socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(), socketPub.getRecoveryInterval(),
+                    socketPub.getSendBufferSize(), socketPub.getSendTimeOut(), socketPub.getSndHWM(),
+                    socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(), socketPub.getTCPKeepAliveIdle(),
+                    socketPub.getTCPKeepAliveInterval(), socketPub.getTCPKeepAliveSetting());
+        }
+        for (final String endpoint : endpoints) {
+            logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
+            socketPub.bind(endpoint);
+        }
+    }
+
+    @Override
+    public synchronized void stop() {
+        super.stop();
+        final Socket socketPub = getPublisher();
+        if (socketPub != null) {
+            logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
+            socketPub.close();
+            publisher = null;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
new file mode 100644
index 0000000..041419c
--- /dev/null
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.jeromq;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.junit.LoggerContextRule;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class JeroMqAppenderTest {
+
+    @AfterClass
+    public static void tearDownClass() {
+        // JeroMqAppender.shutdown();
+    }
+
+    @ClassRule
+    public static LoggerContextRule ctx = new LoggerContextRule("JeroMqAppenderTest.xml");
+
+    @Test(timeout = 10000)
+    public void testAppenderLifeCycle() throws Exception {
+        // do nothing to make sure the appender starts and stops without
+        // locking up resources.
+        Assert.assertNotNull(JeroMqAppender.getContext());
+    }
+
+    @Test(timeout = 10000)
+    public void testClientServer() throws Exception {
+        final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class);
+        final int expectedReceiveCount = 2;
+        final JeroMqTestClient client = new JeroMqTestClient(JeroMqAppender.getContext(), "tcp://localhost:5556", expectedReceiveCount);
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            final Future<List<String>> future = executor.submit(client);
+            Thread.sleep(100);
+            final Logger logger = ctx.getLogger(getClass().getName());
+            appender.resetSendRcs();
+            logger.info("Hello");
+            logger.info("Again");
+            final List<String> list = future.get();
+            Assert.assertEquals(expectedReceiveCount, appender.getSendRcTrue());
+            Assert.assertEquals(0, appender.getSendRcFalse());
+            Assert.assertEquals("Hello", list.get(0));
+            Assert.assertEquals("Again", list.get(1));
+        } finally {
+            executor.shutdown();
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testMultiThreadedServer() throws Exception {
+        final int nThreads = 10;
+        final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class);
+        final int expectedReceiveCount = 2 * nThreads;
+        final JeroMqTestClient client = new JeroMqTestClient(JeroMqAppender.getContext(), "tcp://localhost:5556",
+                expectedReceiveCount);
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            final Future<List<String>> future = executor.submit(client);
+            Thread.sleep(100);
+            final Logger logger = ctx.getLogger(getClass().getName());
+            appender.resetSendRcs();
+            final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads);
+            for (int i = 0; i < 10.; i++) {
+                fixedThreadPool.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        logger.info("Hello");
+                        logger.info("Again");
+                    }
+                });
+            }
+            final List<String> list = future.get();
+            Assert.assertEquals(expectedReceiveCount, appender.getSendRcTrue());
+            Assert.assertEquals(0, appender.getSendRcFalse());
+            int hello = 0;
+            int again = 0;
+            for (final String string : list) {
+                switch (string) {
+                case "Hello":
+                    hello++;
+                    break;
+                case "Again":
+                    again++;
+                    break;
+                default:
+                    Assert.fail("Unexpected message: " + string);
+                }
+            }
+            Assert.assertEquals(nThreads, hello);
+            Assert.assertEquals(nThreads, again);
+        } finally {
+            executor.shutdown();
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testServerOnly() throws Exception {
+        final Logger logger = ctx.getLogger(getClass().getName());
+        final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class);
+        appender.resetSendRcs();
+        logger.info("Hello");
+        logger.info("Again");
+        Assert.assertEquals(2, appender.getSendRcTrue());
+        Assert.assertEquals(0, appender.getSendRcFalse());
+    }
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java
new file mode 100644
index 0000000..ddd06ab
--- /dev/null
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.jeromq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.zeromq.ZMQ;
+
+class JeroMqTestClient implements Callable<List<String>> {
+
+    private final ZMQ.Context context;
+
+    private final String endpoint;
+    private final List<String> messages;
+    private final int receiveCount;
+    
+    JeroMqTestClient(final ZMQ.Context context, final String endpoint, final int receiveCount) {
+        super();
+        this.context = context;
+        this.endpoint = endpoint;
+        this.receiveCount = receiveCount;
+        this.messages = new ArrayList<>(receiveCount);
+    }
+
+    @Override
+    public List<String> call() throws Exception {
+        try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)) {
+            subscriber.connect(endpoint);
+            subscriber.subscribe(new byte[0]);
+            for (int messageNum = 0; messageNum < receiveCount
+                    && !Thread.currentThread().isInterrupted(); messageNum++) {
+                // Use trim to remove the tailing '0' character
+                messages.add(subscriber.recvStr(0).trim());
+            }
+        }
+        return messages;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java b/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java
index 65836d6..006d70c 100644
--- a/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java
@@ -117,6 +117,17 @@ public class LoggerContextRule implements TestRule {
     }
 
     /**
+     * Gets a named Appender for this LoggerContext.
+     * @param <T> The target Appender class
+     * @param name the name of the Appender to look up.
+     * @param cls The target Appender class
+     * @return the named Appender or {@code null} if it wasn't defined in the configuration.
+     */
+    public <T extends Appender> T getAppender(final String name, Class<T> cls) {
+        return cls.cast(getConfiguration().getAppenders().get(name));
+    }
+
+    /**
      * Gets a named Appender or throws an exception for this LoggerContext.
      * @param name the name of the Appender to look up.
      * @return the named Appender.
@@ -129,6 +140,20 @@ public class LoggerContextRule implements TestRule {
     }
 
     /**
+     * Gets a named Appender or throws an exception for this LoggerContext.
+     * @param <T> The target Appender class
+     * @param name the name of the Appender to look up.
+     * @param cls The target Appender class
+     * @return the named Appender.
+     * @throws AssertionError if the Appender doesn't exist.
+     */
+    public <T extends Appender> T getRequiredAppender(final String name, Class<T> cls) {
+        final T appender = getAppender(name, cls);
+        assertNotNull("Appender named " + name + " was null.", appender);
+        return appender;
+    }
+
+    /**
      * Gets a named ListAppender or throws an exception for this LoggerContext.
      * @param name the name of the ListAppender to look up.
      * @return the named ListAppender.

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/resources/JeroMqAppenderTest.xml
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/resources/JeroMqAppenderTest.xml b/log4j-core/src/test/resources/JeroMqAppenderTest.xml
new file mode 100644
index 0000000..119fc00
--- /dev/null
+++ b/log4j-core/src/test/resources/JeroMqAppenderTest.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache license, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the license for the specific language governing permissions and
+  ~ limitations under the license.
+  -->
+<Configuration name="JeroMQAppenderTest" status="TRACE">
+  <Appenders>
+    <JeroMQ name="JeroMQAppender">    
+      <Property name="endpoint">tcp://*:5556</Property>
+      <Property name="endpoint">ipc://info-topic</Property>
+    </JeroMQ>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="JeroMQAppender"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 019b089..fbbb657 100644
--- a/pom.xml
+++ b/pom.xml
@@ -565,6 +565,11 @@
         <version>0.8.2.1</version>
       </dependency>
       <dependency>
+        <groupId>org.zeromq</groupId>
+        <artifactId>jeromq</artifactId>
+        <version>0.3.5</version>
+      </dependency>
+      <dependency>
         <groupId>javax.servlet</groupId>
         <artifactId>servlet-api</artifactId>
         <version>2.5</version>

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/changes/changes.xml
----------------------------------------------------------------------
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 5de063c..6d1d57e 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -40,6 +40,9 @@
       <action issue="LOG4J2-1107" dev="ggregory" type="add" due-to="Mikael Ståldal">
         New Appender for Apache Kafka.
       </action>
+      <action issue="LOG4J2-1113" dev="ggregory" type="add" due-to="Gary Gregory">
+        New publisher Appender for ZeroMQ (using JeroMQ).
+      </action>
       <action issue="LOG4J2-1088" dev="ggregory" type="add" due-to="Gary Gregory">
         Add Comma Separated Value (CSV) layouts for parameter and event logging.
       </action>

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/site/site.xml
----------------------------------------------------------------------
diff --git a/src/site/site.xml b/src/site/site.xml
index 3a2940d..7c82498 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -131,6 +131,7 @@
         <item name="SMTP" href="/manual/appenders.html#SMTPAppender"/>
         <item name="Socket" href="/manual/appenders.html#SocketAppender"/>
         <item name="Syslog" href="/manual/appenders.html#SyslogAppender"/>
+        <item name="ZeroMQ" href="/manual/appenders.html#ZeroMQAppender"/>        
       </item>
 
       <item name="Layouts" href="/manual/layouts.html" collapse="true">

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/site/xdoc/manual/appenders.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/manual/appenders.xml b/src/site/xdoc/manual/appenders.xml
index a99ad88..49f7dec 100644
--- a/src/site/xdoc/manual/appenders.xml
+++ b/src/site/xdoc/manual/appenders.xml
@@ -3288,6 +3288,167 @@ public class JpaLogEntity extends AbstractLogEventWrapperEntity {
 </Configuration>]]></pre>
         </subsection>
 
+        <a name="ZeroMQAppender"/>
+        <subsection name="ZeroMQ Appender">
+          <p>
+            The ZeroMQ appender uses the <a href="https://github.com/zeromq/jeromq">JeroMQ</a> library to send log 
+            events to one or more endpoints.  
+          </p>
+          <p>
+            This is a simple JeroMQ configuration:
+          </p>
+          <pre class="prettyprint linenums"><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+<Configuration name="JeroMQAppenderTest" status="TRACE">
+  <Appenders>
+    <JeroMQ name="JeroMQAppender">    
+      <Property name="endpoint">tcp://*:5556</Property>
+      <Property name="endpoint">ipc://info-topic</Property>
+    </JeroMQ>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="JeroMQAppender"/>
+    </Root>
+  </Loggers>
+</Configuration>]]></pre> 
+          <p>
+            The table below describes all options. Please consult the JeroMQ and ZeroMQ documentation for details.
+          </p>
+          <table>
+            <caption align="top">JeroMQ Parameters</caption>
+            <tr>
+              <th>Parameter Name</th>
+              <th>Type</th>
+              <th>Description</th>
+            </tr>
+            <tr>
+              <td>name</td>
+              <td>String</td>
+              <td>The name of the Appender.</td>
+            </tr>
+            <tr>
+              <td>Layout</td>
+              <td>Layout</td>
+              <td>The Layout of the Appender.</td>
+            </tr>
+            <tr>
+              <td>Filters</td>
+              <td>Filter</td>
+              <td>The Filters of the Appender.</td>
+            </tr>
+            <tr>
+              <td>Property</td>
+              <td>Property</td>
+              <td>One or more Property elements, named <code>endpoint</code>.</td>
+            </tr>
+            <tr>
+              <td>ignoreExceptions</td>
+              <td>boolean</td>
+              <td>If true, exceptions will be logged and suppressed. If false errors will be logged and then passed to the application.</td>
+            </tr>
+            <tr>
+              <td>affinity</td>
+              <td>long</td>
+              <td>The ZMQ_AFFINITY option. Defaults to 0.</td>
+            </tr>
+            <tr>
+              <td>backlog</td>
+              <td>long</td>
+              <td>The ZMQ_BACKLOG option. Defaults to 100.</td>
+            </tr>
+            <tr>
+              <td>delayAttachOnConnect</td>
+              <td>boolean</td>
+              <td>The ZMQ_DELAY_ATTACH_ON_CONNECT option. Defaults to false.</td>
+            </tr>
+            <tr>
+              <td>identity</td>
+              <td>byte[]</td>
+              <td>The ZMQ_IDENTITY option. Defaults to none.</td>
+            </tr>
+            <tr>
+              <td>ipv4Only</td>
+              <td>boolean</td>
+              <td>The ZMQ_IPV4ONLY option. Defaults to true.</td>
+            </tr>
+            <tr>
+              <td>linger</td>
+              <td>long</td>
+              <td>The ZMQ_LINGER option. Defaults to -1.</td>
+            </tr>
+            <tr>
+              <td>maxMsgSize</td>
+              <td>long</td>
+              <td>The ZMQ_MAXMSGSIZE option. Defaults to -1.</td>
+            </tr>
+            <tr>
+              <td>rcvHwm</td>
+              <td>long</td>
+              <td>The ZMQ_RCVHWM option. Defaults to 1000.</td>
+            </tr>
+            <tr>
+              <td>receiveBufferSize</td>
+              <td>long</td>
+              <td>The ZMQ_RCVBUF option. Defaults to 0.</td>
+            </tr>
+            <tr>
+              <td>receiveTimeOut</td>
+              <td>int</td>
+              <td>The ZMQ_RCVTIMEO option. Defaults to -1.</td>
+            </tr>
+            <tr>
+              <td>reconnectIVL</td>
+              <td>long</td>
+              <td>The ZMQ_RECONNECT_IVL option. Defaults to 100.</td>
+            </tr>
+            <tr>
+              <td>reconnectIVLMax</td>
+              <td>long</td>
+              <td>The ZMQ_RECONNECT_IVL_MAX option. Defaults to 0.</td>
+            </tr>
+            <tr>
+              <td>sendBufferSize</td>
+              <td>long</td>
+              <td>The ZMQ_SNDBUF option. Defaults to 0.</td>
+            </tr>
+            <tr>
+              <td>sendTimeOut</td>
+              <td>int</td>
+              <td>The ZMQ_SNDTIMEO option. Defaults to -1.</td>
+            </tr>
+            <tr>
+              <td>sndHwm</td>
+              <td>long</td>
+              <td>The ZMQ_SNDHWM option. Defaults to 1000.</td>
+            </tr>
+            <tr>
+              <td>tcpKeepAlive</td>
+              <td>int</td>
+              <td>The ZMQ_TCP_KEEPALIVE option. Defaults to -1.</td>
+            </tr>
+            <tr>
+              <td>tcpKeepAliveCount</td>
+              <td>long</td>
+              <td>The ZMQ_TCP_KEEPALIVE_CNT option. Defaults to -1.</td>
+            </tr>
+            <tr>
+              <td>tcpKeepAliveIdle</td>
+              <td>long</td>
+              <td>The ZMQ_TCP_KEEPALIVE_IDLE option. Defaults to -1.</td>
+            </tr>
+            <tr>
+              <td>tcpKeepAliveInterval</td>
+              <td>long</td>
+              <td>The ZMQ_TCP_KEEPALIVE_INTVL option. Defaults to -1.</td>
+            </tr>
+            <tr>
+              <td>xpubVerbose</td>
+              <td>boolean</td>
+              <td>The ZMQ_XPUB_VERBOSE option. Defaults to false.</td>
+            </tr>
+          </table>          
+        </subsection>
+
       </section>
   </body>
 </document>

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/site/xdoc/runtime-dependencies.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/runtime-dependencies.xml b/src/site/xdoc/runtime-dependencies.xml
index bbee62a..ace4141 100644
--- a/src/site/xdoc/runtime-dependencies.xml
+++ b/src/site/xdoc/runtime-dependencies.xml
@@ -100,6 +100,14 @@
             In addition, XZ requires <a href="http://tukaani.org/xz/java.html">XZ for Java</a>.
           </td>
         </tr>
+        <tr>
+          <td>ZeroMQ Appender</td>
+          <td>
+            The ZeroMQ appender uses the <a href="https://github.com/zeromq/jeromq">JeroMQ</a> library which is 
+            licensed under the terms of the GNU Lesser General Public License (LGPL). For details see the 
+            files <code>COPYING</code> and <code>COPYING.LESSER</code> included with the JeroMQ distribution.  
+          </td>
+        </tr>
       </table>
 
       <a name="log4j-jcl" />