You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by gg...@apache.org on 2015/09/10 01:01:37 UTC
logging-log4j2 git commit: [LOG4J2-1113] New publisher Appender for
ZeroMQ (using JeroMQ).
Repository: logging-log4j2
Updated Branches:
refs/heads/master ce1668592 -> 80c9dcbea
[LOG4J2-1113] New publisher Appender for ZeroMQ (using JeroMQ).
Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/80c9dcbe
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/80c9dcbe
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/80c9dcbe
Branch: refs/heads/master
Commit: 80c9dcbeaed4976649ad5398fff2d3b1540d9cb7
Parents: ce16685
Author: ggregory <gg...@apache.org>
Authored: Wed Sep 9 16:01:27 2015 -0700
Committer: ggregory <gg...@apache.org>
Committed: Wed Sep 9 16:01:27 2015 -0700
----------------------------------------------------------------------
log4j-core/pom.xml | 6 +
.../appender/mom/jeromq/JeroMqAppender.java | 350 +++++++++++++++++++
.../appender/mom/jeromq/JeroMqAppenderTest.java | 130 +++++++
.../appender/mom/jeromq/JeroMqTestClient.java | 55 +++
.../logging/log4j/junit/LoggerContextRule.java | 25 ++
.../src/test/resources/JeroMqAppenderTest.xml | 30 ++
pom.xml | 5 +
src/changes/changes.xml | 3 +
src/site/site.xml | 1 +
src/site/xdoc/manual/appenders.xml | 161 +++++++++
src/site/xdoc/runtime-dependencies.xml | 8 +
11 files changed, 774 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/pom.xml
----------------------------------------------------------------------
diff --git a/log4j-core/pom.xml b/log4j-core/pom.xml
index 1c00fbc..f8193e8 100644
--- a/log4j-core/pom.xml
+++ b/log4j-core/pom.xml
@@ -114,6 +114,12 @@
<artifactId>kafka-clients</artifactId>
<optional>true</optional>
</dependency>
+ <!-- Used for ZeroMQ JeroMQ appender -->
+ <dependency>
+ <groupId>org.zeromq</groupId>
+ <artifactId>jeromq</artifactId>
+ <optional>true</optional>
+ </dependency>
<!-- Used for compressing to formats other than zip and gz -->
<dependency>
<groupId>org.apache.commons</groupId>
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
new file mode 100644
index 0000000..90ed8f5
--- /dev/null
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.jeromq;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.apache.logging.log4j.status.StatusLogger;
+import org.apache.logging.log4j.util.PropertiesUtil;
+import org.apache.logging.log4j.util.Strings;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.Socket;
+
+/**
+ * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
+ * <p>
+ * Requires the JeroMQ jar (LGPL as of 0.3.5)
+ * </p>
+ */
+// TODO
+// Some methods are synchronized because a ZMQ.Socket is not thread-safe
+// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
+// some issue on threads owning certain resources as opposed to others.
+@Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
+public final class JeroMqAppender extends AbstractAppender {
+
+ // Per ZMQ docs, there should usually only be one ZMQ context per process.
+ private static volatile ZMQ.Context context;
+
+ private static Logger logger;
+
+ // ZMQ sockets are not thread safe.
+ private static ZMQ.Socket publisher;
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
+
+ static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
+
+ static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
+
+ static {
+ logger = StatusLogger.getLogger();
+ final PropertiesUtil managerProps = PropertiesUtil.getProperties();
+ final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
+ final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
+ final String simpleName = SIMPLE_NAME;
+ logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
+ logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
+ context = ZMQ.context(ioThreads);
+ logger.trace("{} created ZMQ context {}", simpleName, context);
+ if (enableShutdownHook) {
+ final Thread hook = new Thread(simpleName + "-shutdown") {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ };
+ logger.trace("{} adding shutdown hook {}", simpleName, hook);
+ Runtime.getRuntime().addShutdownHook(hook);
+ }
+ }
+
+ // The ZMQ.Socket class has other set methods that we do not cover because
+ // they throw unsupported operation exceptions.
+ @PluginFactory
+ public static JeroMqAppender createAppender(
+ // @formatter:off
+ @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
+ @PluginElement("Layout") Layout<?> layout,
+ @PluginElement("Filters") final Filter filter,
+ @PluginElement("Properties") final Property[] properties,
+ // Super attributes
+ @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
+ // ZMQ attributes; defaults picked from zmq.Options.
+ @PluginAttribute(value="affinity", defaultLong=0) final long affinity,
+ @PluginAttribute(value="backlog", defaultLong=100) final long backlog,
+ @PluginAttribute(value="delayAttachOnConnect", defaultBoolean=false) final boolean delayAttachOnConnect,
+ @PluginAttribute(value="identity") final byte[] identity,
+ @PluginAttribute(value="ipv4Only", defaultBoolean=true) final boolean ipv4Only,
+ @PluginAttribute(value="linger", defaultLong=-1) final long linger,
+ @PluginAttribute(value="maxMsgSize", defaultLong=-1) final long maxMsgSize,
+ @PluginAttribute(value="rcvHwm", defaultLong=1000) final long rcvHwm,
+ @PluginAttribute(value="receiveBufferSize", defaultLong=0) final long receiveBufferSize,
+ @PluginAttribute(value="receiveTimeOut", defaultLong=-1) final int receiveTimeOut,
+ @PluginAttribute(value="reconnectIVL", defaultLong=100) final long reconnectIVL,
+ @PluginAttribute(value="reconnectIVLMax", defaultLong=0) final long reconnectIVLMax,
+ @PluginAttribute(value="sendBufferSize", defaultLong=0) final long sendBufferSize,
+ @PluginAttribute(value="sendTimeOut", defaultLong=-1) final int sendTimeOut,
+ @PluginAttribute(value="sndHwm", defaultLong=1000) final long sndHwm,
+ @PluginAttribute(value="tcpKeepAlive", defaultInt=-1) final int tcpKeepAlive,
+ @PluginAttribute(value="tcpKeepAliveCount", defaultLong=-1) final long tcpKeepAliveCount,
+ @PluginAttribute(value="tcpKeepAliveIdle", defaultLong=-1) final long tcpKeepAliveIdle,
+ @PluginAttribute(value="tcpKeepAliveInterval", defaultLong=-1) final long tcpKeepAliveInterval,
+ @PluginAttribute(value="xpubVerbose", defaultBoolean=false) final boolean xpubVerbose
+ // @formatter:on
+ ) {
+ if (layout == null) {
+ layout = PatternLayout.createDefaultLayout();
+ }
+ List<String> endpoints;
+ if (properties == null) {
+ endpoints = new ArrayList<>(0);
+ } else {
+ endpoints = new ArrayList<>(properties.length);
+ for (final Property property : properties) {
+ if ("endpoint".equalsIgnoreCase(property.getName())) {
+ final String value = property.getValue();
+ if (Strings.isNotEmpty(value)) {
+ endpoints.add(value);
+ }
+ }
+ }
+ }
+ logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
+ name, filter, layout, ignoreExceptions, endpoints);
+ return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
+ delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut,
+ reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, tcpKeepAliveCount,
+ tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
+ }
+
+ static ZMQ.Context getContext() {
+ return context;
+ }
+
+ private static ZMQ.Socket getPublisher() {
+ return publisher;
+ }
+
+ private static ZMQ.Socket newPublisher() {
+ logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
+ final Socket socketPub = context.socket(ZMQ.PUB);
+ logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
+ return socketPub;
+ }
+
+ static void shutdown() {
+ if (context != null) {
+ logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
+ context.term();
+ context = null;
+ }
+ }
+
+ private final long affinity;
+ private final long backlog;
+ private final boolean delayAttachOnConnect;
+ private final List<String> endpoints;
+ private final byte[] identity;
+ private final int ioThreads = 1;
+ private final boolean ipv4Only;
+ private final long linger;
+ private final long maxMsgSize;
+ private final long rcvHwm;
+ private final long receiveBufferSize;
+ private final int receiveTimeOut;
+ private final long reconnectIVL;
+ private final long reconnectIVLMax;
+ private final long sendBufferSize;
+ private int sendRcFalse;
+ private int sendRcTrue;
+ private final int sendTimeOut;
+ private final long sndHwm;
+ private final int tcpKeepAlive;
+ private final long tcpKeepAliveCount;
+ private final long tcpKeepAliveIdle;
+ private final long tcpKeepAliveInterval;
+ private final boolean xpubVerbose;
+
+ private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
+ final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
+ final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
+ final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
+ final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
+ final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
+ final long tcpKeepAliveInterval, final boolean xpubVerbose) {
+ super(name, filter, layout, ignoreExceptions);
+ this.endpoints = endpoints;
+ this.affinity = affinity;
+ this.backlog = backlog;
+ this.delayAttachOnConnect = delayAttachOnConnect;
+ this.identity = identity;
+ this.ipv4Only = ipv4Only;
+ this.linger = linger;
+ this.maxMsgSize = maxMsgSize;
+ this.rcvHwm = rcvHwm;
+ this.receiveBufferSize = receiveBufferSize;
+ this.receiveTimeOut = receiveTimeOut;
+ this.reconnectIVL = reconnectIVL;
+ this.reconnectIVLMax = reconnectIVLMax;
+ this.sendBufferSize = sendBufferSize;
+ this.sendTimeOut = sendTimeOut;
+ this.sndHwm = sndHWM;
+ this.tcpKeepAlive = tcpKeepAlive;
+ this.tcpKeepAliveCount = tcpKeepAliveCount;
+ this.tcpKeepAliveIdle = tcpKeepAliveIdle;
+ this.tcpKeepAliveInterval = tcpKeepAliveInterval;
+ this.xpubVerbose = xpubVerbose;
+ }
+
+ @Override
+ public synchronized void append(final LogEvent event) {
+ final String formattedMessage = event.getMessage().getFormattedMessage();
+ if (getPublisher().send(formattedMessage, 0)) {
+ sendRcTrue++;
+ } else {
+ sendRcFalse++;
+ logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse,
+ formattedMessage);
+ }
+ }
+
+ // not public, handy for testing
+ int getSendRcFalse() {
+ return sendRcFalse;
+ }
+
+ // not public, handy for testing
+ int getSendRcTrue() {
+ return sendRcTrue;
+ }
+
+ // not public, handy for testing
+ void resetSendRcs() {
+ sendRcTrue = sendRcFalse = 0;
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ publisher = newPublisher();
+ final String name = getName();
+ final String prefix = "JeroMQ Appender";
+ logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
+ logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
+ //
+ final ZMQ.Socket socketPub = getPublisher();
+ logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name,
+ socketPub.getClass().getName(), socketPub);
+ logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
+ socketPub.setAffinity(affinity);
+ logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
+ socketPub.setBacklog(backlog);
+ logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
+ socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
+ if (identity != null) {
+ logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
+ socketPub.setIdentity(identity);
+ }
+ logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
+ socketPub.setIPv4Only(ipv4Only);
+ logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
+ socketPub.setLinger(linger);
+ logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
+ socketPub.setMaxMsgSize(maxMsgSize);
+ logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
+ socketPub.setRcvHWM(rcvHwm);
+ logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
+ socketPub.setReceiveBufferSize(receiveBufferSize);
+ logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
+ socketPub.setReceiveTimeOut(receiveTimeOut);
+ logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
+ socketPub.setReconnectIVL(reconnectIVL);
+ logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
+ socketPub.setReconnectIVLMax(reconnectIVLMax);
+ logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
+ socketPub.setSendBufferSize(sendBufferSize);
+ logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
+ socketPub.setSendTimeOut(sendTimeOut);
+ logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
+ socketPub.setSndHWM(sndHwm);
+ logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
+ socketPub.setTCPKeepAlive(tcpKeepAlive);
+ logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
+ socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
+ logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
+ socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
+ logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
+ socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
+ logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
+ socketPub.setXpubVerbose(xpubVerbose);
+ //
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
+ + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
+ + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
+ name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
+ socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
+ socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(), socketPub.getRate(),
+ socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(), socketPub.getReceiveTimeOut(),
+ socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(), socketPub.getRecoveryInterval(),
+ socketPub.getSendBufferSize(), socketPub.getSendTimeOut(), socketPub.getSndHWM(),
+ socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(), socketPub.getTCPKeepAliveIdle(),
+ socketPub.getTCPKeepAliveInterval(), socketPub.getTCPKeepAliveSetting());
+ }
+ for (final String endpoint : endpoints) {
+ logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
+ socketPub.bind(endpoint);
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ super.stop();
+ final Socket socketPub = getPublisher();
+ if (socketPub != null) {
+ logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
+ socketPub.close();
+ publisher = null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
new file mode 100644
index 0000000..041419c
--- /dev/null
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.jeromq;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.junit.LoggerContextRule;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class JeroMqAppenderTest {
+
+ @AfterClass
+ public static void tearDownClass() {
+ // JeroMqAppender.shutdown();
+ }
+
+ @ClassRule
+ public static LoggerContextRule ctx = new LoggerContextRule("JeroMqAppenderTest.xml");
+
+ @Test(timeout = 10000)
+ public void testAppenderLifeCycle() throws Exception {
+ // do nothing to make sure the appender starts and stops without
+ // locking up resources.
+ Assert.assertNotNull(JeroMqAppender.getContext());
+ }
+
+ @Test(timeout = 10000)
+ public void testClientServer() throws Exception {
+ final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class);
+ final int expectedReceiveCount = 2;
+ final JeroMqTestClient client = new JeroMqTestClient(JeroMqAppender.getContext(), "tcp://localhost:5556", expectedReceiveCount);
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ final Future<List<String>> future = executor.submit(client);
+ Thread.sleep(100);
+ final Logger logger = ctx.getLogger(getClass().getName());
+ appender.resetSendRcs();
+ logger.info("Hello");
+ logger.info("Again");
+ final List<String> list = future.get();
+ Assert.assertEquals(expectedReceiveCount, appender.getSendRcTrue());
+ Assert.assertEquals(0, appender.getSendRcFalse());
+ Assert.assertEquals("Hello", list.get(0));
+ Assert.assertEquals("Again", list.get(1));
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testMultiThreadedServer() throws Exception {
+ final int nThreads = 10;
+ final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class);
+ final int expectedReceiveCount = 2 * nThreads;
+ final JeroMqTestClient client = new JeroMqTestClient(JeroMqAppender.getContext(), "tcp://localhost:5556",
+ expectedReceiveCount);
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ final Future<List<String>> future = executor.submit(client);
+ Thread.sleep(100);
+ final Logger logger = ctx.getLogger(getClass().getName());
+ appender.resetSendRcs();
+ final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads);
+ for (int i = 0; i < 10.; i++) {
+ fixedThreadPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ logger.info("Hello");
+ logger.info("Again");
+ }
+ });
+ }
+ final List<String> list = future.get();
+ Assert.assertEquals(expectedReceiveCount, appender.getSendRcTrue());
+ Assert.assertEquals(0, appender.getSendRcFalse());
+ int hello = 0;
+ int again = 0;
+ for (final String string : list) {
+ switch (string) {
+ case "Hello":
+ hello++;
+ break;
+ case "Again":
+ again++;
+ break;
+ default:
+ Assert.fail("Unexpected message: " + string);
+ }
+ }
+ Assert.assertEquals(nThreads, hello);
+ Assert.assertEquals(nThreads, again);
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testServerOnly() throws Exception {
+ final Logger logger = ctx.getLogger(getClass().getName());
+ final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class);
+ appender.resetSendRcs();
+ logger.info("Hello");
+ logger.info("Again");
+ Assert.assertEquals(2, appender.getSendRcTrue());
+ Assert.assertEquals(0, appender.getSendRcFalse());
+ }
+}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java
new file mode 100644
index 0000000..ddd06ab
--- /dev/null
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.jeromq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.zeromq.ZMQ;
+
+class JeroMqTestClient implements Callable<List<String>> {
+
+ private final ZMQ.Context context;
+
+ private final String endpoint;
+ private final List<String> messages;
+ private final int receiveCount;
+
+ JeroMqTestClient(final ZMQ.Context context, final String endpoint, final int receiveCount) {
+ super();
+ this.context = context;
+ this.endpoint = endpoint;
+ this.receiveCount = receiveCount;
+ this.messages = new ArrayList<>(receiveCount);
+ }
+
+ @Override
+ public List<String> call() throws Exception {
+ try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)) {
+ subscriber.connect(endpoint);
+ subscriber.subscribe(new byte[0]);
+ for (int messageNum = 0; messageNum < receiveCount
+ && !Thread.currentThread().isInterrupted(); messageNum++) {
+ // Use trim to remove the tailing '0' character
+ messages.add(subscriber.recvStr(0).trim());
+ }
+ }
+ return messages;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java b/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java
index 65836d6..006d70c 100644
--- a/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java
@@ -117,6 +117,17 @@ public class LoggerContextRule implements TestRule {
}
/**
+ * Gets a named Appender for this LoggerContext.
+ * @param <T> The target Appender class
+ * @param name the name of the Appender to look up.
+ * @param cls The target Appender class
+ * @return the named Appender or {@code null} if it wasn't defined in the configuration.
+ */
+ public <T extends Appender> T getAppender(final String name, Class<T> cls) {
+ return cls.cast(getConfiguration().getAppenders().get(name));
+ }
+
+ /**
* Gets a named Appender or throws an exception for this LoggerContext.
* @param name the name of the Appender to look up.
* @return the named Appender.
@@ -129,6 +140,20 @@ public class LoggerContextRule implements TestRule {
}
/**
+ * Gets a named Appender or throws an exception for this LoggerContext.
+ * @param <T> The target Appender class
+ * @param name the name of the Appender to look up.
+ * @param cls The target Appender class
+ * @return the named Appender.
+ * @throws AssertionError if the Appender doesn't exist.
+ */
+ public <T extends Appender> T getRequiredAppender(final String name, Class<T> cls) {
+ final T appender = getAppender(name, cls);
+ assertNotNull("Appender named " + name + " was null.", appender);
+ return appender;
+ }
+
+ /**
* Gets a named ListAppender or throws an exception for this LoggerContext.
* @param name the name of the ListAppender to look up.
* @return the named ListAppender.
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/resources/JeroMqAppenderTest.xml
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/resources/JeroMqAppenderTest.xml b/log4j-core/src/test/resources/JeroMqAppenderTest.xml
new file mode 100644
index 0000000..119fc00
--- /dev/null
+++ b/log4j-core/src/test/resources/JeroMqAppenderTest.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache license, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the license for the specific language governing permissions and
+ ~ limitations under the license.
+ -->
+<Configuration name="JeroMQAppenderTest" status="TRACE">
+ <Appenders>
+ <JeroMQ name="JeroMQAppender">
+ <Property name="endpoint">tcp://*:5556</Property>
+ <Property name="endpoint">ipc://info-topic</Property>
+ </JeroMQ>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="JeroMQAppender"/>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 019b089..fbbb657 100644
--- a/pom.xml
+++ b/pom.xml
@@ -565,6 +565,11 @@
<version>0.8.2.1</version>
</dependency>
<dependency>
+ <groupId>org.zeromq</groupId>
+ <artifactId>jeromq</artifactId>
+ <version>0.3.5</version>
+ </dependency>
+ <dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/changes/changes.xml
----------------------------------------------------------------------
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 5de063c..6d1d57e 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -40,6 +40,9 @@
<action issue="LOG4J2-1107" dev="ggregory" type="add" due-to="Mikael Ståldal">
New Appender for Apache Kafka.
</action>
+ <action issue="LOG4J2-1113" dev="ggregory" type="add" due-to="Gary Gregory">
+ New publisher Appender for ZeroMQ (using JeroMQ).
+ </action>
<action issue="LOG4J2-1088" dev="ggregory" type="add" due-to="Gary Gregory">
Add Comma Separated Value (CSV) layouts for parameter and event logging.
</action>
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/site/site.xml
----------------------------------------------------------------------
diff --git a/src/site/site.xml b/src/site/site.xml
index 3a2940d..7c82498 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -131,6 +131,7 @@
<item name="SMTP" href="/manual/appenders.html#SMTPAppender"/>
<item name="Socket" href="/manual/appenders.html#SocketAppender"/>
<item name="Syslog" href="/manual/appenders.html#SyslogAppender"/>
+ <item name="ZeroMQ" href="/manual/appenders.html#ZeroMQAppender"/>
</item>
<item name="Layouts" href="/manual/layouts.html" collapse="true">
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/site/xdoc/manual/appenders.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/manual/appenders.xml b/src/site/xdoc/manual/appenders.xml
index a99ad88..49f7dec 100644
--- a/src/site/xdoc/manual/appenders.xml
+++ b/src/site/xdoc/manual/appenders.xml
@@ -3288,6 +3288,167 @@ public class JpaLogEntity extends AbstractLogEventWrapperEntity {
</Configuration>]]></pre>
</subsection>
+ <a name="ZeroMQAppender"/>
+ <subsection name="ZeroMQ Appender">
+ <p>
+ The ZeroMQ appender uses the <a href="https://github.com/zeromq/jeromq">JeroMQ</a> library to send log
+ events to one or more endpoints.
+ </p>
+ <p>
+ This is a simple JeroMQ configuration:
+ </p>
+ <pre class="prettyprint linenums"><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+<Configuration name="JeroMQAppenderTest" status="TRACE">
+ <Appenders>
+ <JeroMQ name="JeroMQAppender">
+ <Property name="endpoint">tcp://*:5556</Property>
+ <Property name="endpoint">ipc://info-topic</Property>
+ </JeroMQ>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="JeroMQAppender"/>
+ </Root>
+ </Loggers>
+</Configuration>]]></pre>
+ <p>
+ The table below describes all options. Please consult the JeroMQ and ZeroMQ documentation for details.
+ </p>
+ <table>
+ <caption align="top">JeroMQ Parameters</caption>
+ <tr>
+ <th>Parameter Name</th>
+ <th>Type</th>
+ <th>Description</th>
+ </tr>
+ <tr>
+ <td>name</td>
+ <td>String</td>
+ <td>The name of the Appender.</td>
+ </tr>
+ <tr>
+ <td>Layout</td>
+ <td>Layout</td>
+ <td>The Layout of the Appender.</td>
+ </tr>
+ <tr>
+ <td>Filters</td>
+ <td>Filter</td>
+ <td>The Filters of the Appender.</td>
+ </tr>
+ <tr>
+ <td>Property</td>
+ <td>Property</td>
+ <td>One or more Property elements, named <code>endpoint</code>.</td>
+ </tr>
+ <tr>
+ <td>ignoreExceptions</td>
+ <td>boolean</td>
+ <td>If true, exceptions will be logged and suppressed. If false errors will be logged and then passed to the application.</td>
+ </tr>
+ <tr>
+ <td>affinity</td>
+ <td>long</td>
+ <td>The ZMQ_AFFINITY option. Defaults to 0.</td>
+ </tr>
+ <tr>
+ <td>backlog</td>
+ <td>long</td>
+ <td>The ZMQ_BACKLOG option. Defaults to 100.</td>
+ </tr>
+ <tr>
+ <td>delayAttachOnConnect</td>
+ <td>boolean</td>
+ <td>The ZMQ_DELAY_ATTACH_ON_CONNECT option. Defaults to false.</td>
+ </tr>
+ <tr>
+ <td>identity</td>
+ <td>byte[]</td>
+ <td>The ZMQ_IDENTITY option. Defaults to none.</td>
+ </tr>
+ <tr>
+ <td>ipv4Only</td>
+ <td>boolean</td>
+ <td>The ZMQ_IPV4ONLY option. Defaults to true.</td>
+ </tr>
+ <tr>
+ <td>linger</td>
+ <td>long</td>
+ <td>The ZMQ_LINGER option. Defaults to -1.</td>
+ </tr>
+ <tr>
+ <td>maxMsgSize</td>
+ <td>long</td>
+ <td>The ZMQ_MAXMSGSIZE option. Defaults to -1.</td>
+ </tr>
+ <tr>
+ <td>rcvHwm</td>
+ <td>long</td>
+ <td>The ZMQ_RCVHWM option. Defaults to 1000.</td>
+ </tr>
+ <tr>
+ <td>receiveBufferSize</td>
+ <td>long</td>
+ <td>The ZMQ_RCVBUF option. Defaults to 0.</td>
+ </tr>
+ <tr>
+ <td>receiveTimeOut</td>
+ <td>int</td>
+ <td>The ZMQ_RCVTIMEO option. Defaults to -1.</td>
+ </tr>
+ <tr>
+ <td>reconnectIVL</td>
+ <td>long</td>
+ <td>The ZMQ_RECONNECT_IVL option. Defaults to 100.</td>
+ </tr>
+ <tr>
+ <td>reconnectIVLMax</td>
+ <td>long</td>
+ <td>The ZMQ_RECONNECT_IVL_MAX option. Defaults to 0.</td>
+ </tr>
+ <tr>
+ <td>sendBufferSize</td>
+ <td>long</td>
+ <td>The ZMQ_SNDBUF option. Defaults to 0.</td>
+ </tr>
+ <tr>
+ <td>sendTimeOut</td>
+ <td>int</td>
+ <td>The ZMQ_SNDTIMEO option. Defaults to -1.</td>
+ </tr>
+ <tr>
+ <td>sndHwm</td>
+ <td>long</td>
+ <td>The ZMQ_SNDHWM option. Defaults to 1000.</td>
+ </tr>
+ <tr>
+ <td>tcpKeepAlive</td>
+ <td>int</td>
+ <td>The ZMQ_TCP_KEEPALIVE option. Defaults to -1.</td>
+ </tr>
+ <tr>
+ <td>tcpKeepAliveCount</td>
+ <td>long</td>
+ <td>The ZMQ_TCP_KEEPALIVE_CNT option. Defaults to -1.</td>
+ </tr>
+ <tr>
+ <td>tcpKeepAliveIdle</td>
+ <td>long</td>
+ <td>The ZMQ_TCP_KEEPALIVE_IDLE option. Defaults to -1.</td>
+ </tr>
+ <tr>
+ <td>tcpKeepAliveInterval</td>
+ <td>long</td>
+ <td>The ZMQ_TCP_KEEPALIVE_INTVL option. Defaults to -1.</td>
+ </tr>
+ <tr>
+ <td>xpubVerbose</td>
+ <td>boolean</td>
+ <td>The ZMQ_XPUB_VERBOSE option. Defaults to false.</td>
+ </tr>
+ </table>
+ </subsection>
+
</section>
</body>
</document>
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/site/xdoc/runtime-dependencies.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/runtime-dependencies.xml b/src/site/xdoc/runtime-dependencies.xml
index bbee62a..ace4141 100644
--- a/src/site/xdoc/runtime-dependencies.xml
+++ b/src/site/xdoc/runtime-dependencies.xml
@@ -100,6 +100,14 @@
In addition, XZ requires <a href="http://tukaani.org/xz/java.html">XZ for Java</a>.
</td>
</tr>
+ <tr>
+ <td>ZeroMQ Appender</td>
+ <td>
+ The ZeroMQ appender uses the <a href="https://github.com/zeromq/jeromq">JeroMQ</a> library which is
+ licensed under the terms of the GNU Lesser General Public License (LGPL). For details see the
+ files <code>COPYING</code> and <code>COPYING.LESSER</code> included with the JeroMQ distribution.
+ </td>
+ </tr>
</table>
<a name="log4j-jcl" />