You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rp...@apache.org on 2016/09/08 14:51:14 UTC

[45/50] [abbrv] logging-log4j2 git commit: Properly track resources during shutdown.

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
index 5a60653..96b2f57 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
@@ -1,185 +1,185 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-
-package org.apache.logging.log4j.core.appender.mom.jeromq;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.core.Appender;
-import org.apache.logging.log4j.core.Filter;
-import org.apache.logging.log4j.core.Layout;
-import org.apache.logging.log4j.core.LogEvent;
-import org.apache.logging.log4j.core.appender.AbstractAppender;
-import org.apache.logging.log4j.core.config.Node;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.config.plugins.Plugin;
-import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
-import org.apache.logging.log4j.core.config.plugins.PluginElement;
-import org.apache.logging.log4j.core.config.plugins.PluginFactory;
-import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
-import org.apache.logging.log4j.core.layout.PatternLayout;
-import org.apache.logging.log4j.util.Strings;
-
-/**
- * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
- * <p>
- * Requires the JeroMQ jar (LGPL as of 0.3.5)
- * </p>
- */
-// TODO
-// Some methods are synchronized because a ZMQ.Socket is not thread-safe
-// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
-// some issue on threads owning certain resources as opposed to others.
-@Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
-public final class JeroMqAppender extends AbstractAppender {
-
-    private static final int DEFAULT_BACKLOG = 100;
-
-    private static final int DEFAULT_IVL = 100;
-
-    private static final int DEFAULT_RCV_HWM = 1000;
-
-    private static final int DEFAULT_SND_HWM = 1000;
-
-    private final JeroMqManager manager;
-    private final List<String> endpoints;
-    private int sendRcFalse;
-    private int sendRcTrue;
-
-    private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
-            final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
-            final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
-            final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
-            final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
-            final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
-            final long tcpKeepAliveInterval, final boolean xpubVerbose) {
-        super(name, filter, layout, ignoreExceptions);
-        this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only,
-            linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax,
-            sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle,
-            tcpKeepAliveInterval, xpubVerbose, endpoints);
-        this.endpoints = endpoints;
-    }
-
-    // The ZMQ.Socket class has other set methods that we do not cover because
-    // they throw unsupported operation exceptions.
-    @PluginFactory
-    public static JeroMqAppender createAppender(
-            // @formatter:off
-            @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
-            @PluginElement("Layout") Layout<?> layout,
-            @PluginElement("Filter") final Filter filter,
-            @PluginElement("Properties") final Property[] properties,
-            // Super attributes
-            @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
-            // ZMQ attributes; defaults picked from zmq.Options.
-            @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
-            @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
-            @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
-            @PluginAttribute(value = "identity") final byte[] identity,
-            @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
-            @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
-            @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
-            @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
-            @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
-            @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
-            @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
-            @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
-            @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
-            @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
-            @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
-            @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
-            @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
-            @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
-            @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
-            @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
-            // @formatter:on
-    ) {
-        if (layout == null) {
-            layout = PatternLayout.createDefaultLayout();
-        }
-        List<String> endpoints;
-        if (properties == null) {
-            endpoints = new ArrayList<>(0);
-        } else {
-            endpoints = new ArrayList<>(properties.length);
-            for (final Property property : properties) {
-                if ("endpoint".equalsIgnoreCase(property.getName())) {
-                    final String value = property.getValue();
-                    if (Strings.isNotEmpty(value)) {
-                        endpoints.add(value);
-                    }
-                }
-            }
-        }
-        LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
-                name, filter, layout, ignoreExceptions, endpoints);
-        return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
-                delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
-                receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
-                tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
-    }
-
-    @Override
-    public synchronized void append(final LogEvent event) {
-        final Layout<? extends Serializable> layout = getLayout();
-        final byte[] formattedMessage = layout.toByteArray(event);
-        if (manager.send(getLayout().toByteArray(event))) {
-            sendRcTrue++;
-        } else {
-            sendRcFalse++;
-            LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
-        }
-    }
-
-    @Override
-    public boolean stop(final long timeout, final TimeUnit timeUnit) {
-        setStopping();
-        super.stop(timeout, timeUnit, false);
-        manager.stop(timeout, timeUnit);
-        setStopped();
-        return true;
-    }
-
-    // not public, handy for testing
-    int getSendRcFalse() {
-        return sendRcFalse;
-    }
-
-    // not public, handy for testing
-    int getSendRcTrue() {
-        return sendRcTrue;
-    }
-
-    // not public, handy for testing
-    void resetSendRcs() {
-        sendRcTrue = sendRcFalse = 0;
-    }
-
-    @Override
-    public String toString() {
-        return "JeroMqAppender{" +
-            "name=" + getName() +
-            ", state=" + getState() +
-            ", manager=" + manager +
-            ", endpoints=" + endpoints +
-            '}';
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.jeromq;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.apache.logging.log4j.util.Strings;
+
+/**
+ * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
+ * <p>
+ * Requires the JeroMQ jar (LGPL as of 0.3.5)
+ * </p>
+ */
+// TODO
+// Some methods are synchronized because a ZMQ.Socket is not thread-safe
+// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
+// some issue on threads owning certain resources as opposed to others.
+@Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
+public final class JeroMqAppender extends AbstractAppender {
+
+    private static final int DEFAULT_BACKLOG = 100;
+
+    private static final int DEFAULT_IVL = 100;
+
+    private static final int DEFAULT_RCV_HWM = 1000;
+
+    private static final int DEFAULT_SND_HWM = 1000;
+
+    private final JeroMqManager manager;
+    private final List<String> endpoints;
+    private int sendRcFalse;
+    private int sendRcTrue;
+
+    private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
+            final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
+            final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
+            final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
+            final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
+            final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
+            final long tcpKeepAliveInterval, final boolean xpubVerbose) {
+        super(name, filter, layout, ignoreExceptions);
+        this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only,
+            linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax,
+            sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle,
+            tcpKeepAliveInterval, xpubVerbose, endpoints);
+        this.endpoints = endpoints;
+    }
+
+    // The ZMQ.Socket class has other set methods that we do not cover because
+    // they throw unsupported operation exceptions.
+    @PluginFactory
+    public static JeroMqAppender createAppender(
+            // @formatter:off
+            @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
+            @PluginElement("Layout") Layout<?> layout,
+            @PluginElement("Filter") final Filter filter,
+            @PluginElement("Properties") final Property[] properties,
+            // Super attributes
+            @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
+            // ZMQ attributes; defaults picked from zmq.Options.
+            @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
+            @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
+            @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
+            @PluginAttribute(value = "identity") final byte[] identity,
+            @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
+            @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
+            @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
+            @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
+            @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
+            @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
+            @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
+            @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
+            @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
+            @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
+            @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
+            @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
+            @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
+            @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
+            @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
+            @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
+            // @formatter:on
+    ) {
+        if (layout == null) {
+            layout = PatternLayout.createDefaultLayout();
+        }
+        List<String> endpoints;
+        if (properties == null) {
+            endpoints = new ArrayList<>(0);
+        } else {
+            endpoints = new ArrayList<>(properties.length);
+            for (final Property property : properties) {
+                if ("endpoint".equalsIgnoreCase(property.getName())) {
+                    final String value = property.getValue();
+                    if (Strings.isNotEmpty(value)) {
+                        endpoints.add(value);
+                    }
+                }
+            }
+        }
+        LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
+                name, filter, layout, ignoreExceptions, endpoints);
+        return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
+                delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
+                receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
+                tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
+    }
+
+    @Override
+    public synchronized void append(final LogEvent event) {
+        final Layout<? extends Serializable> layout = getLayout();
+        final byte[] formattedMessage = layout.toByteArray(event);
+        if (manager.send(getLayout().toByteArray(event))) {
+            sendRcTrue++;
+        } else {
+            sendRcFalse++;
+            LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
+        }
+    }
+
+    @Override
+    public boolean stop(final long timeout, final TimeUnit timeUnit) {
+        setStopping();
+        boolean stopped = super.stop(timeout, timeUnit, false);
+        stopped &= manager.stop(timeout, timeUnit);
+        setStopped();
+        return stopped;
+    }
+
+    // not public, handy for testing
+    int getSendRcFalse() {
+        return sendRcFalse;
+    }
+
+    // not public, handy for testing
+    int getSendRcTrue() {
+        return sendRcTrue;
+    }
+
+    // not public, handy for testing
+    void resetSendRcs() {
+        sendRcTrue = sendRcFalse = 0;
+    }
+
+    @Override
+    public String toString() {
+        return "JeroMqAppender{" +
+            "name=" + getName() +
+            ", state=" + getState() +
+            ", manager=" + manager +
+            ", endpoints=" + endpoints +
+            '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
index f731d61..a438faf 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
@@ -1,221 +1,222 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-
-package org.apache.logging.log4j.core.appender.mom.jeromq;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.appender.AbstractManager;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
-import org.apache.logging.log4j.util.PropertiesUtil;
-import org.zeromq.ZMQ;
-
-/**
- * Manager for publishing messages via JeroMq.
- *
- * @since 2.6
- */
-public class JeroMqManager extends AbstractManager {
-
-    /**
-     * System property to enable shutdown hook.
-     */
-    public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
-
-    /**
-     * System property to control JeroMQ I/O thread count.
-     */
-    public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
-
-    private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
-    private static final ZMQ.Context CONTEXT;
-
-    static {
-        LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString());
-
-        final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
-        LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
-        CONTEXT = ZMQ.context(ioThreads);
-
-        final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
-            SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
-        if (enableShutdownHook) {
-            ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() {
-                @Override
-                public void run() {
-                    CONTEXT.close();
-                }
-            });
-        }
-    }
-
-    private final ZMQ.Socket publisher;
-
-    private JeroMqManager(final String name, final JeroMqConfiguration config) {
-        super(null, name);
-        publisher = CONTEXT.socket(ZMQ.PUB);
-        publisher.setAffinity(config.affinity);
-        publisher.setBacklog(config.backlog);
-        publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
-        if (config.identity != null) {
-            publisher.setIdentity(config.identity);
-        }
-        publisher.setIPv4Only(config.ipv4Only);
-        publisher.setLinger(config.linger);
-        publisher.setMaxMsgSize(config.maxMsgSize);
-        publisher.setRcvHWM(config.rcvHwm);
-        publisher.setReceiveBufferSize(config.receiveBufferSize);
-        publisher.setReceiveTimeOut(config.receiveTimeOut);
-        publisher.setReconnectIVL(config.reconnectIVL);
-        publisher.setReconnectIVLMax(config.reconnectIVLMax);
-        publisher.setSendBufferSize(config.sendBufferSize);
-        publisher.setSendTimeOut(config.sendTimeOut);
-        publisher.setSndHWM(config.sndHwm);
-        publisher.setTCPKeepAlive(config.tcpKeepAlive);
-        publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount);
-        publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
-        publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
-        publisher.setXpubVerbose(config.xpubVerbose);
-        for (final String endpoint : config.endpoints) {
-            publisher.bind(endpoint);
-        }
-        LOGGER.debug("Created JeroMqManager with {}", config);
-    }
-
-    public boolean send(final byte[] data) {
-        return publisher.send(data);
-    }
-
-    @Override
-    protected void releaseSub(final long timeout, final TimeUnit timeUnit) {
-        publisher.close();
-    }
-
-    public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
-                                                 final boolean delayAttachOnConnect, final byte[] identity,
-                                                 final boolean ipv4Only, final long linger, final long maxMsgSize,
-                                                 final long rcvHwm, final long receiveBufferSize,
-                                                 final int receiveTimeOut, final long reconnectIVL,
-                                                 final long reconnectIVLMax, final long sendBufferSize,
-                                                 final int sendTimeOut, final long sndHwm, final int tcpKeepAlive,
-                                                 final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
-                                                 final long tcpKeepAliveInterval, final boolean xpubVerbose,
-                                                 final List<String> endpoints) {
-        return getManager(name, FACTORY,
-            new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize,
-                rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut,
-                sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose,
-                endpoints));
-    }
-
-    public static ZMQ.Context getContext() {
-        return CONTEXT;
-    }
-
-    private static class JeroMqConfiguration {
-        private final long affinity;
-        private final long backlog;
-        private final boolean delayAttachOnConnect;
-        private final byte[] identity;
-        private final boolean ipv4Only;
-        private final long linger;
-        private final long maxMsgSize;
-        private final long rcvHwm;
-        private final long receiveBufferSize;
-        private final int receiveTimeOut;
-        private final long reconnectIVL;
-        private final long reconnectIVLMax;
-        private final long sendBufferSize;
-        private final int sendTimeOut;
-        private final long sndHwm;
-        private final int tcpKeepAlive;
-        private final long tcpKeepAliveCount;
-        private final long tcpKeepAliveIdle;
-        private final long tcpKeepAliveInterval;
-        private final boolean xpubVerbose;
-        private final List<String> endpoints;
-
-        private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect,
-                                    final byte[] identity, final boolean ipv4Only, final long linger,
-                                    final long maxMsgSize, final long rcvHwm, final long receiveBufferSize,
-                                    final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax,
-                                    final long sendBufferSize, final int sendTimeOut, final long sndHwm,
-                                    final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
-                                    final long tcpKeepAliveInterval, final boolean xpubVerbose,
-                                    final List<String> endpoints) {
-            this.affinity = affinity;
-            this.backlog = backlog;
-            this.delayAttachOnConnect = delayAttachOnConnect;
-            this.identity = identity;
-            this.ipv4Only = ipv4Only;
-            this.linger = linger;
-            this.maxMsgSize = maxMsgSize;
-            this.rcvHwm = rcvHwm;
-            this.receiveBufferSize = receiveBufferSize;
-            this.receiveTimeOut = receiveTimeOut;
-            this.reconnectIVL = reconnectIVL;
-            this.reconnectIVLMax = reconnectIVLMax;
-            this.sendBufferSize = sendBufferSize;
-            this.sendTimeOut = sendTimeOut;
-            this.sndHwm = sndHwm;
-            this.tcpKeepAlive = tcpKeepAlive;
-            this.tcpKeepAliveCount = tcpKeepAliveCount;
-            this.tcpKeepAliveIdle = tcpKeepAliveIdle;
-            this.tcpKeepAliveInterval = tcpKeepAliveInterval;
-            this.xpubVerbose = xpubVerbose;
-            this.endpoints = endpoints;
-        }
-
-        @Override
-        public String toString() {
-            return "JeroMqConfiguration{" +
-                "affinity=" + affinity +
-                ", backlog=" + backlog +
-                ", delayAttachOnConnect=" + delayAttachOnConnect +
-                ", identity=" + Arrays.toString(identity) +
-                ", ipv4Only=" + ipv4Only +
-                ", linger=" + linger +
-                ", maxMsgSize=" + maxMsgSize +
-                ", rcvHwm=" + rcvHwm +
-                ", receiveBufferSize=" + receiveBufferSize +
-                ", receiveTimeOut=" + receiveTimeOut +
-                ", reconnectIVL=" + reconnectIVL +
-                ", reconnectIVLMax=" + reconnectIVLMax +
-                ", sendBufferSize=" + sendBufferSize +
-                ", sendTimeOut=" + sendTimeOut +
-                ", sndHwm=" + sndHwm +
-                ", tcpKeepAlive=" + tcpKeepAlive +
-                ", tcpKeepAliveCount=" + tcpKeepAliveCount +
-                ", tcpKeepAliveIdle=" + tcpKeepAliveIdle +
-                ", tcpKeepAliveInterval=" + tcpKeepAliveInterval +
-                ", xpubVerbose=" + xpubVerbose +
-                ", endpoints=" + endpoints +
-                '}';
-        }
-    }
-
-    private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> {
-        @Override
-        public JeroMqManager createManager(final String name, final JeroMqConfiguration data) {
-            return new JeroMqManager(name, data);
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.jeromq;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
+import org.apache.logging.log4j.util.PropertiesUtil;
+import org.zeromq.ZMQ;
+
+/**
+ * Manager for publishing messages via JeroMq.
+ *
+ * @since 2.6
+ */
+public class JeroMqManager extends AbstractManager {
+
+    /**
+     * System property to enable shutdown hook.
+     */
+    public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
+
+    /**
+     * System property to control JeroMQ I/O thread count.
+     */
+    public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
+
+    private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
+    private static final ZMQ.Context CONTEXT;
+
+    static {
+        LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString());
+
+        final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
+        LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
+        CONTEXT = ZMQ.context(ioThreads);
+
+        final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
+            SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
+        if (enableShutdownHook) {
+            ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() {
+                @Override
+                public void run() {
+                    CONTEXT.close();
+                }
+            });
+        }
+    }
+
+    private final ZMQ.Socket publisher;
+
+    private JeroMqManager(final String name, final JeroMqConfiguration config) {
+        super(null, name);
+        publisher = CONTEXT.socket(ZMQ.PUB);
+        publisher.setAffinity(config.affinity);
+        publisher.setBacklog(config.backlog);
+        publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
+        if (config.identity != null) {
+            publisher.setIdentity(config.identity);
+        }
+        publisher.setIPv4Only(config.ipv4Only);
+        publisher.setLinger(config.linger);
+        publisher.setMaxMsgSize(config.maxMsgSize);
+        publisher.setRcvHWM(config.rcvHwm);
+        publisher.setReceiveBufferSize(config.receiveBufferSize);
+        publisher.setReceiveTimeOut(config.receiveTimeOut);
+        publisher.setReconnectIVL(config.reconnectIVL);
+        publisher.setReconnectIVLMax(config.reconnectIVLMax);
+        publisher.setSendBufferSize(config.sendBufferSize);
+        publisher.setSendTimeOut(config.sendTimeOut);
+        publisher.setSndHWM(config.sndHwm);
+        publisher.setTCPKeepAlive(config.tcpKeepAlive);
+        publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount);
+        publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
+        publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
+        publisher.setXpubVerbose(config.xpubVerbose);
+        for (final String endpoint : config.endpoints) {
+            publisher.bind(endpoint);
+        }
+        LOGGER.debug("Created JeroMqManager with {}", config);
+    }
+
+    public boolean send(final byte[] data) {
+        return publisher.send(data);
+    }
+
+    @Override
+    protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
+        publisher.close();
+        return true;
+    }
+
+    public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
+                                                 final boolean delayAttachOnConnect, final byte[] identity,
+                                                 final boolean ipv4Only, final long linger, final long maxMsgSize,
+                                                 final long rcvHwm, final long receiveBufferSize,
+                                                 final int receiveTimeOut, final long reconnectIVL,
+                                                 final long reconnectIVLMax, final long sendBufferSize,
+                                                 final int sendTimeOut, final long sndHwm, final int tcpKeepAlive,
+                                                 final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
+                                                 final long tcpKeepAliveInterval, final boolean xpubVerbose,
+                                                 final List<String> endpoints) {
+        return getManager(name, FACTORY,
+            new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize,
+                rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut,
+                sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose,
+                endpoints));
+    }
+
+    public static ZMQ.Context getContext() {
+        return CONTEXT;
+    }
+
+    private static class JeroMqConfiguration {
+        private final long affinity;
+        private final long backlog;
+        private final boolean delayAttachOnConnect;
+        private final byte[] identity;
+        private final boolean ipv4Only;
+        private final long linger;
+        private final long maxMsgSize;
+        private final long rcvHwm;
+        private final long receiveBufferSize;
+        private final int receiveTimeOut;
+        private final long reconnectIVL;
+        private final long reconnectIVLMax;
+        private final long sendBufferSize;
+        private final int sendTimeOut;
+        private final long sndHwm;
+        private final int tcpKeepAlive;
+        private final long tcpKeepAliveCount;
+        private final long tcpKeepAliveIdle;
+        private final long tcpKeepAliveInterval;
+        private final boolean xpubVerbose;
+        private final List<String> endpoints;
+
+        private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect,
+                                    final byte[] identity, final boolean ipv4Only, final long linger,
+                                    final long maxMsgSize, final long rcvHwm, final long receiveBufferSize,
+                                    final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax,
+                                    final long sendBufferSize, final int sendTimeOut, final long sndHwm,
+                                    final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
+                                    final long tcpKeepAliveInterval, final boolean xpubVerbose,
+                                    final List<String> endpoints) {
+            this.affinity = affinity;
+            this.backlog = backlog;
+            this.delayAttachOnConnect = delayAttachOnConnect;
+            this.identity = identity;
+            this.ipv4Only = ipv4Only;
+            this.linger = linger;
+            this.maxMsgSize = maxMsgSize;
+            this.rcvHwm = rcvHwm;
+            this.receiveBufferSize = receiveBufferSize;
+            this.receiveTimeOut = receiveTimeOut;
+            this.reconnectIVL = reconnectIVL;
+            this.reconnectIVLMax = reconnectIVLMax;
+            this.sendBufferSize = sendBufferSize;
+            this.sendTimeOut = sendTimeOut;
+            this.sndHwm = sndHwm;
+            this.tcpKeepAlive = tcpKeepAlive;
+            this.tcpKeepAliveCount = tcpKeepAliveCount;
+            this.tcpKeepAliveIdle = tcpKeepAliveIdle;
+            this.tcpKeepAliveInterval = tcpKeepAliveInterval;
+            this.xpubVerbose = xpubVerbose;
+            this.endpoints = endpoints;
+        }
+
+        @Override
+        public String toString() {
+            return "JeroMqConfiguration{" +
+                "affinity=" + affinity +
+                ", backlog=" + backlog +
+                ", delayAttachOnConnect=" + delayAttachOnConnect +
+                ", identity=" + Arrays.toString(identity) +
+                ", ipv4Only=" + ipv4Only +
+                ", linger=" + linger +
+                ", maxMsgSize=" + maxMsgSize +
+                ", rcvHwm=" + rcvHwm +
+                ", receiveBufferSize=" + receiveBufferSize +
+                ", receiveTimeOut=" + receiveTimeOut +
+                ", reconnectIVL=" + reconnectIVL +
+                ", reconnectIVLMax=" + reconnectIVLMax +
+                ", sendBufferSize=" + sendBufferSize +
+                ", sendTimeOut=" + sendTimeOut +
+                ", sndHwm=" + sndHwm +
+                ", tcpKeepAlive=" + tcpKeepAlive +
+                ", tcpKeepAliveCount=" + tcpKeepAliveCount +
+                ", tcpKeepAliveIdle=" + tcpKeepAliveIdle +
+                ", tcpKeepAliveInterval=" + tcpKeepAliveInterval +
+                ", xpubVerbose=" + xpubVerbose +
+                ", endpoints=" + endpoints +
+                '}';
+        }
+    }
+
+    private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> {
+        @Override
+        public JeroMqManager createManager(final String name, final JeroMqConfiguration data) {
+            return new JeroMqManager(name, data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
index bee9437..787f655 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -1,120 +1,120 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-
-package org.apache.logging.log4j.core.appender.mom.kafka;
-
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.core.Appender;
-import org.apache.logging.log4j.core.Filter;
-import org.apache.logging.log4j.core.Layout;
-import org.apache.logging.log4j.core.LogEvent;
-import org.apache.logging.log4j.core.appender.AbstractAppender;
-import org.apache.logging.log4j.core.appender.AppenderLoggingException;
-import org.apache.logging.log4j.core.config.Configuration;
-import org.apache.logging.log4j.core.config.Node;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.config.plugins.Plugin;
-import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
-import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
-import org.apache.logging.log4j.core.config.plugins.PluginElement;
-import org.apache.logging.log4j.core.config.plugins.PluginFactory;
-import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
-import org.apache.logging.log4j.core.layout.SerializedLayout;
-import org.apache.logging.log4j.core.util.StringEncoder;
-
-/**
- * Sends log events to an Apache Kafka topic.
- */
-@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
-public final class KafkaAppender extends AbstractAppender {
-
-    @PluginFactory
-    public static KafkaAppender createAppender(
-            @PluginElement("Layout") final Layout<? extends Serializable> layout,
-            @PluginElement("Filter") final Filter filter,
-            @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
-            @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
-            @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
-            @PluginElement("Properties") final Property[] properties,
-            @PluginConfiguration final Configuration configuration) {
-        final KafkaManager kafkaManager = new KafkaManager(configuration.getLoggerContext(), name, topic, properties);
-        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
-    }
-
-    private final KafkaManager manager;
-
-    private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
-        super(name, filter, layout, ignoreExceptions);
-        this.manager = manager;
-    }
-
-    @Override
-    public void append(final LogEvent event) {
-        if (event.getLoggerName().startsWith("org.apache.kafka")) {
-            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
-        } else {
-            try {
-                final Layout<? extends Serializable> layout = getLayout();
-                byte[] data;
-                if (layout != null) {
-                    if (layout instanceof SerializedLayout) {
-                        final byte[] header = layout.getHeader();
-                        final byte[] body = layout.toByteArray(event);
-                        data = new byte[header.length + body.length];
-                        System.arraycopy(header, 0, data, 0, header.length);
-                        System.arraycopy(body, 0, data, header.length, body.length);
-                    } else {
-                        data = layout.toByteArray(event);
-                    }
-                } else {
-                    data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
-                }
-                manager.send(data);
-            } catch (final Exception e) {
-                LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
-                throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
-            }
-        }
-    }
-
-    @Override
-    public void start() {
-        super.start();
-        manager.startup();
-    }
-
-    @Override
-    public boolean stop(final long timeout, final TimeUnit timeUnit) {
-        setStopping();
-        super.stop(timeout, timeUnit, false);
-        manager.stop(timeout, timeUnit);
-        setStopped();
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaAppender{" +
-            "name=" + getName() +
-            ", state=" + getState() +
-            ", topic=" + manager.getTopic() +
-            '}';
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.appender.AppenderLoggingException;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+import org.apache.logging.log4j.core.layout.SerializedLayout;
+import org.apache.logging.log4j.core.util.StringEncoder;
+
+/**
+ * Sends log events to an Apache Kafka topic.
+ */
+@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
+public final class KafkaAppender extends AbstractAppender {
+
+    @PluginFactory
+    public static KafkaAppender createAppender(
+            @PluginElement("Layout") final Layout<? extends Serializable> layout,
+            @PluginElement("Filter") final Filter filter,
+            @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
+            @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
+            @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
+            @PluginElement("Properties") final Property[] properties,
+            @PluginConfiguration final Configuration configuration) {
+        final KafkaManager kafkaManager = new KafkaManager(configuration.getLoggerContext(), name, topic, properties);
+        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
+    }
+
+    private final KafkaManager manager;
+
+    private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
+        super(name, filter, layout, ignoreExceptions);
+        this.manager = manager;
+    }
+
+    @Override
+    public void append(final LogEvent event) {
+        if (event.getLoggerName().startsWith("org.apache.kafka")) {
+            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
+        } else {
+            try {
+                final Layout<? extends Serializable> layout = getLayout();
+                byte[] data;
+                if (layout != null) {
+                    if (layout instanceof SerializedLayout) {
+                        final byte[] header = layout.getHeader();
+                        final byte[] body = layout.toByteArray(event);
+                        data = new byte[header.length + body.length];
+                        System.arraycopy(header, 0, data, 0, header.length);
+                        System.arraycopy(body, 0, data, header.length, body.length);
+                    } else {
+                        data = layout.toByteArray(event);
+                    }
+                } else {
+                    data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
+                }
+                manager.send(data);
+            } catch (final Exception e) {
+                LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
+                throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        manager.startup();
+    }
+
+    @Override
+    public boolean stop(final long timeout, final TimeUnit timeUnit) {
+        setStopping();
+        boolean stopped = super.stop(timeout, timeUnit, false);
+        stopped &= manager.stop(timeout, timeUnit);
+        setStopped();
+        return stopped;
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaAppender{" +
+            "name=" + getName() +
+            ", state=" + getState() +
+            ", topic=" + manager.getTopic() +
+            '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index d991d7e..c5347a2 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -1,79 +1,80 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-
-package org.apache.logging.log4j.core.appender.mom.kafka;
-
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.appender.AbstractManager;
-import org.apache.logging.log4j.core.config.Property;
-
-public class KafkaManager extends AbstractManager {
-
-    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
-
-    /**
-     * package-private access for testing.
-     */
-    static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
-
-    private final Properties config = new Properties();
-    private Producer<byte[], byte[]> producer;
-    private final int timeoutMillis;
-
-    private final String topic;
-
-    public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final Property[] properties) {
-        super(loggerContext, name);
-        this.topic = topic;
-        config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-        config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-        config.setProperty("batch.size", "0");
-        for (final Property property : properties) {
-            config.setProperty(property.getName(), property.getValue());
-        }
-        this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
-    }
-
-    @Override
-    public void releaseSub(final long timeout, final TimeUnit timeUnit) {
-        if (producer != null) {
-            producer.close(timeout, timeUnit);
-        }
-    }
-
-    public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
-        if (producer != null) {
-            producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    public void startup() {
-        producer = producerFactory.newKafkaProducer(config);
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.config.Property;
+
+public class KafkaManager extends AbstractManager {
+
+    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
+
+    /**
+     * package-private access for testing.
+     */
+    static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
+
+    private final Properties config = new Properties();
+    private Producer<byte[], byte[]> producer;
+    private final int timeoutMillis;
+
+    private final String topic;
+
+    public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final Property[] properties) {
+        super(loggerContext, name);
+        this.topic = topic;
+        config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        config.setProperty("batch.size", "0");
+        for (final Property property : properties) {
+            config.setProperty(property.getName(), property.getValue());
+        }
+        this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
+    }
+
+    @Override
+    public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
+        if (producer != null) {
+            producer.close(timeout, timeUnit);
+        }
+        return true;
+    }
+
+    public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+        if (producer != null) {
+            producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    public void startup() {
+        producer = producerFactory.newKafkaProducer(config);
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+}