You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rp...@apache.org on 2016/09/08 14:51:14 UTC
[45/50] [abbrv] logging-log4j2 git commit: Properly track resources
during shutdown.
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
index 5a60653..96b2f57 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java
@@ -1,185 +1,185 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-
-package org.apache.logging.log4j.core.appender.mom.jeromq;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.core.Appender;
-import org.apache.logging.log4j.core.Filter;
-import org.apache.logging.log4j.core.Layout;
-import org.apache.logging.log4j.core.LogEvent;
-import org.apache.logging.log4j.core.appender.AbstractAppender;
-import org.apache.logging.log4j.core.config.Node;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.config.plugins.Plugin;
-import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
-import org.apache.logging.log4j.core.config.plugins.PluginElement;
-import org.apache.logging.log4j.core.config.plugins.PluginFactory;
-import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
-import org.apache.logging.log4j.core.layout.PatternLayout;
-import org.apache.logging.log4j.util.Strings;
-
-/**
- * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
- * <p>
- * Requires the JeroMQ jar (LGPL as of 0.3.5)
- * </p>
- */
-// TODO
-// Some methods are synchronized because a ZMQ.Socket is not thread-safe
-// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
-// some issue on threads owning certain resources as opposed to others.
-@Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
-public final class JeroMqAppender extends AbstractAppender {
-
- private static final int DEFAULT_BACKLOG = 100;
-
- private static final int DEFAULT_IVL = 100;
-
- private static final int DEFAULT_RCV_HWM = 1000;
-
- private static final int DEFAULT_SND_HWM = 1000;
-
- private final JeroMqManager manager;
- private final List<String> endpoints;
- private int sendRcFalse;
- private int sendRcTrue;
-
- private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
- final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
- final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
- final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
- final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
- final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
- final long tcpKeepAliveInterval, final boolean xpubVerbose) {
- super(name, filter, layout, ignoreExceptions);
- this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only,
- linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax,
- sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle,
- tcpKeepAliveInterval, xpubVerbose, endpoints);
- this.endpoints = endpoints;
- }
-
- // The ZMQ.Socket class has other set methods that we do not cover because
- // they throw unsupported operation exceptions.
- @PluginFactory
- public static JeroMqAppender createAppender(
- // @formatter:off
- @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
- @PluginElement("Layout") Layout<?> layout,
- @PluginElement("Filter") final Filter filter,
- @PluginElement("Properties") final Property[] properties,
- // Super attributes
- @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
- // ZMQ attributes; defaults picked from zmq.Options.
- @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
- @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
- @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
- @PluginAttribute(value = "identity") final byte[] identity,
- @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
- @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
- @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
- @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
- @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
- @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
- @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
- @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
- @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
- @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
- @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
- @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
- @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
- @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
- @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
- @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
- // @formatter:on
- ) {
- if (layout == null) {
- layout = PatternLayout.createDefaultLayout();
- }
- List<String> endpoints;
- if (properties == null) {
- endpoints = new ArrayList<>(0);
- } else {
- endpoints = new ArrayList<>(properties.length);
- for (final Property property : properties) {
- if ("endpoint".equalsIgnoreCase(property.getName())) {
- final String value = property.getValue();
- if (Strings.isNotEmpty(value)) {
- endpoints.add(value);
- }
- }
- }
- }
- LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
- name, filter, layout, ignoreExceptions, endpoints);
- return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
- delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
- receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
- tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
- }
-
- @Override
- public synchronized void append(final LogEvent event) {
- final Layout<? extends Serializable> layout = getLayout();
- final byte[] formattedMessage = layout.toByteArray(event);
- if (manager.send(getLayout().toByteArray(event))) {
- sendRcTrue++;
- } else {
- sendRcFalse++;
- LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
- }
- }
-
- @Override
- public boolean stop(final long timeout, final TimeUnit timeUnit) {
- setStopping();
- super.stop(timeout, timeUnit, false);
- manager.stop(timeout, timeUnit);
- setStopped();
- return true;
- }
-
- // not public, handy for testing
- int getSendRcFalse() {
- return sendRcFalse;
- }
-
- // not public, handy for testing
- int getSendRcTrue() {
- return sendRcTrue;
- }
-
- // not public, handy for testing
- void resetSendRcs() {
- sendRcTrue = sendRcFalse = 0;
- }
-
- @Override
- public String toString() {
- return "JeroMqAppender{" +
- "name=" + getName() +
- ", state=" + getState() +
- ", manager=" + manager +
- ", endpoints=" + endpoints +
- '}';
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.jeromq;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.apache.logging.log4j.util.Strings;
+
+/**
+ * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
+ * <p>
+ * Requires the JeroMQ jar (LGPL as of 0.3.5)
+ * </p>
+ */
+// TODO
+// Some methods are synchronized because a ZMQ.Socket is not thread-safe
+// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
+// some issue on threads owning certain resources as opposed to others.
+@Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
+public final class JeroMqAppender extends AbstractAppender {
+
+ private static final int DEFAULT_BACKLOG = 100;
+
+ private static final int DEFAULT_IVL = 100;
+
+ private static final int DEFAULT_RCV_HWM = 1000;
+
+ private static final int DEFAULT_SND_HWM = 1000;
+
+ private final JeroMqManager manager;
+ private final List<String> endpoints;
+ private int sendRcFalse;
+ private int sendRcTrue;
+
+ private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
+ final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
+ final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
+ final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
+ final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
+ final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
+ final long tcpKeepAliveInterval, final boolean xpubVerbose) {
+ super(name, filter, layout, ignoreExceptions);
+ this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only,
+ linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax,
+ sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle,
+ tcpKeepAliveInterval, xpubVerbose, endpoints);
+ this.endpoints = endpoints;
+ }
+
+ // The ZMQ.Socket class has other set methods that we do not cover because
+ // they throw unsupported operation exceptions.
+ @PluginFactory
+ public static JeroMqAppender createAppender(
+ // @formatter:off
+ @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
+ @PluginElement("Layout") Layout<?> layout,
+ @PluginElement("Filter") final Filter filter,
+ @PluginElement("Properties") final Property[] properties,
+ // Super attributes
+ @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
+ // ZMQ attributes; defaults picked from zmq.Options.
+ @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
+ @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
+ @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
+ @PluginAttribute(value = "identity") final byte[] identity,
+ @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
+ @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
+ @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
+ @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
+ @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
+ @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
+ @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
+ @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
+ @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
+ @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
+ @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
+ @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
+ @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
+ @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
+ @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
+ @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
+ // @formatter:on
+ ) {
+ if (layout == null) {
+ layout = PatternLayout.createDefaultLayout();
+ }
+ List<String> endpoints;
+ if (properties == null) {
+ endpoints = new ArrayList<>(0);
+ } else {
+ endpoints = new ArrayList<>(properties.length);
+ for (final Property property : properties) {
+ if ("endpoint".equalsIgnoreCase(property.getName())) {
+ final String value = property.getValue();
+ if (Strings.isNotEmpty(value)) {
+ endpoints.add(value);
+ }
+ }
+ }
+ }
+ LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
+ name, filter, layout, ignoreExceptions, endpoints);
+ return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
+ delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
+ receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
+ tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
+ }
+
+ @Override
+ public synchronized void append(final LogEvent event) {
+ final Layout<? extends Serializable> layout = getLayout();
+ final byte[] formattedMessage = layout.toByteArray(event);
+ if (manager.send(getLayout().toByteArray(event))) {
+ sendRcTrue++;
+ } else {
+ sendRcFalse++;
+ LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
+ }
+ }
+
+ @Override
+ public boolean stop(final long timeout, final TimeUnit timeUnit) {
+ setStopping();
+ boolean stopped = super.stop(timeout, timeUnit, false);
+ stopped &= manager.stop(timeout, timeUnit);
+ setStopped();
+ return stopped;
+ }
+
+ // not public, handy for testing
+ int getSendRcFalse() {
+ return sendRcFalse;
+ }
+
+ // not public, handy for testing
+ int getSendRcTrue() {
+ return sendRcTrue;
+ }
+
+ // not public, handy for testing
+ void resetSendRcs() {
+ sendRcTrue = sendRcFalse = 0;
+ }
+
+ @Override
+ public String toString() {
+ return "JeroMqAppender{" +
+ "name=" + getName() +
+ ", state=" + getState() +
+ ", manager=" + manager +
+ ", endpoints=" + endpoints +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
index f731d61..a438faf 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
@@ -1,221 +1,222 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-
-package org.apache.logging.log4j.core.appender.mom.jeromq;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.appender.AbstractManager;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
-import org.apache.logging.log4j.util.PropertiesUtil;
-import org.zeromq.ZMQ;
-
-/**
- * Manager for publishing messages via JeroMq.
- *
- * @since 2.6
- */
-public class JeroMqManager extends AbstractManager {
-
- /**
- * System property to enable shutdown hook.
- */
- public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
-
- /**
- * System property to control JeroMQ I/O thread count.
- */
- public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
-
- private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
- private static final ZMQ.Context CONTEXT;
-
- static {
- LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString());
-
- final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
- LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
- CONTEXT = ZMQ.context(ioThreads);
-
- final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
- SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
- if (enableShutdownHook) {
- ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() {
- @Override
- public void run() {
- CONTEXT.close();
- }
- });
- }
- }
-
- private final ZMQ.Socket publisher;
-
- private JeroMqManager(final String name, final JeroMqConfiguration config) {
- super(null, name);
- publisher = CONTEXT.socket(ZMQ.PUB);
- publisher.setAffinity(config.affinity);
- publisher.setBacklog(config.backlog);
- publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
- if (config.identity != null) {
- publisher.setIdentity(config.identity);
- }
- publisher.setIPv4Only(config.ipv4Only);
- publisher.setLinger(config.linger);
- publisher.setMaxMsgSize(config.maxMsgSize);
- publisher.setRcvHWM(config.rcvHwm);
- publisher.setReceiveBufferSize(config.receiveBufferSize);
- publisher.setReceiveTimeOut(config.receiveTimeOut);
- publisher.setReconnectIVL(config.reconnectIVL);
- publisher.setReconnectIVLMax(config.reconnectIVLMax);
- publisher.setSendBufferSize(config.sendBufferSize);
- publisher.setSendTimeOut(config.sendTimeOut);
- publisher.setSndHWM(config.sndHwm);
- publisher.setTCPKeepAlive(config.tcpKeepAlive);
- publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount);
- publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
- publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
- publisher.setXpubVerbose(config.xpubVerbose);
- for (final String endpoint : config.endpoints) {
- publisher.bind(endpoint);
- }
- LOGGER.debug("Created JeroMqManager with {}", config);
- }
-
- public boolean send(final byte[] data) {
- return publisher.send(data);
- }
-
- @Override
- protected void releaseSub(final long timeout, final TimeUnit timeUnit) {
- publisher.close();
- }
-
- public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
- final boolean delayAttachOnConnect, final byte[] identity,
- final boolean ipv4Only, final long linger, final long maxMsgSize,
- final long rcvHwm, final long receiveBufferSize,
- final int receiveTimeOut, final long reconnectIVL,
- final long reconnectIVLMax, final long sendBufferSize,
- final int sendTimeOut, final long sndHwm, final int tcpKeepAlive,
- final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
- final long tcpKeepAliveInterval, final boolean xpubVerbose,
- final List<String> endpoints) {
- return getManager(name, FACTORY,
- new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize,
- rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut,
- sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose,
- endpoints));
- }
-
- public static ZMQ.Context getContext() {
- return CONTEXT;
- }
-
- private static class JeroMqConfiguration {
- private final long affinity;
- private final long backlog;
- private final boolean delayAttachOnConnect;
- private final byte[] identity;
- private final boolean ipv4Only;
- private final long linger;
- private final long maxMsgSize;
- private final long rcvHwm;
- private final long receiveBufferSize;
- private final int receiveTimeOut;
- private final long reconnectIVL;
- private final long reconnectIVLMax;
- private final long sendBufferSize;
- private final int sendTimeOut;
- private final long sndHwm;
- private final int tcpKeepAlive;
- private final long tcpKeepAliveCount;
- private final long tcpKeepAliveIdle;
- private final long tcpKeepAliveInterval;
- private final boolean xpubVerbose;
- private final List<String> endpoints;
-
- private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect,
- final byte[] identity, final boolean ipv4Only, final long linger,
- final long maxMsgSize, final long rcvHwm, final long receiveBufferSize,
- final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax,
- final long sendBufferSize, final int sendTimeOut, final long sndHwm,
- final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
- final long tcpKeepAliveInterval, final boolean xpubVerbose,
- final List<String> endpoints) {
- this.affinity = affinity;
- this.backlog = backlog;
- this.delayAttachOnConnect = delayAttachOnConnect;
- this.identity = identity;
- this.ipv4Only = ipv4Only;
- this.linger = linger;
- this.maxMsgSize = maxMsgSize;
- this.rcvHwm = rcvHwm;
- this.receiveBufferSize = receiveBufferSize;
- this.receiveTimeOut = receiveTimeOut;
- this.reconnectIVL = reconnectIVL;
- this.reconnectIVLMax = reconnectIVLMax;
- this.sendBufferSize = sendBufferSize;
- this.sendTimeOut = sendTimeOut;
- this.sndHwm = sndHwm;
- this.tcpKeepAlive = tcpKeepAlive;
- this.tcpKeepAliveCount = tcpKeepAliveCount;
- this.tcpKeepAliveIdle = tcpKeepAliveIdle;
- this.tcpKeepAliveInterval = tcpKeepAliveInterval;
- this.xpubVerbose = xpubVerbose;
- this.endpoints = endpoints;
- }
-
- @Override
- public String toString() {
- return "JeroMqConfiguration{" +
- "affinity=" + affinity +
- ", backlog=" + backlog +
- ", delayAttachOnConnect=" + delayAttachOnConnect +
- ", identity=" + Arrays.toString(identity) +
- ", ipv4Only=" + ipv4Only +
- ", linger=" + linger +
- ", maxMsgSize=" + maxMsgSize +
- ", rcvHwm=" + rcvHwm +
- ", receiveBufferSize=" + receiveBufferSize +
- ", receiveTimeOut=" + receiveTimeOut +
- ", reconnectIVL=" + reconnectIVL +
- ", reconnectIVLMax=" + reconnectIVLMax +
- ", sendBufferSize=" + sendBufferSize +
- ", sendTimeOut=" + sendTimeOut +
- ", sndHwm=" + sndHwm +
- ", tcpKeepAlive=" + tcpKeepAlive +
- ", tcpKeepAliveCount=" + tcpKeepAliveCount +
- ", tcpKeepAliveIdle=" + tcpKeepAliveIdle +
- ", tcpKeepAliveInterval=" + tcpKeepAliveInterval +
- ", xpubVerbose=" + xpubVerbose +
- ", endpoints=" + endpoints +
- '}';
- }
- }
-
- private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> {
- @Override
- public JeroMqManager createManager(final String name, final JeroMqConfiguration data) {
- return new JeroMqManager(name, data);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.jeromq;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
+import org.apache.logging.log4j.util.PropertiesUtil;
+import org.zeromq.ZMQ;
+
+/**
+ * Manager for publishing messages via JeroMq.
+ *
+ * @since 2.6
+ */
+public class JeroMqManager extends AbstractManager {
+
+ /**
+ * System property to enable shutdown hook.
+ */
+ public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
+
+ /**
+ * System property to control JeroMQ I/O thread count.
+ */
+ public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
+
+ private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
+ private static final ZMQ.Context CONTEXT;
+
+ static {
+ LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString());
+
+ final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
+ LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
+ CONTEXT = ZMQ.context(ioThreads);
+
+ final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
+ SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
+ if (enableShutdownHook) {
+ ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() {
+ @Override
+ public void run() {
+ CONTEXT.close();
+ }
+ });
+ }
+ }
+
+ private final ZMQ.Socket publisher;
+
+ private JeroMqManager(final String name, final JeroMqConfiguration config) {
+ super(null, name);
+ publisher = CONTEXT.socket(ZMQ.PUB);
+ publisher.setAffinity(config.affinity);
+ publisher.setBacklog(config.backlog);
+ publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
+ if (config.identity != null) {
+ publisher.setIdentity(config.identity);
+ }
+ publisher.setIPv4Only(config.ipv4Only);
+ publisher.setLinger(config.linger);
+ publisher.setMaxMsgSize(config.maxMsgSize);
+ publisher.setRcvHWM(config.rcvHwm);
+ publisher.setReceiveBufferSize(config.receiveBufferSize);
+ publisher.setReceiveTimeOut(config.receiveTimeOut);
+ publisher.setReconnectIVL(config.reconnectIVL);
+ publisher.setReconnectIVLMax(config.reconnectIVLMax);
+ publisher.setSendBufferSize(config.sendBufferSize);
+ publisher.setSendTimeOut(config.sendTimeOut);
+ publisher.setSndHWM(config.sndHwm);
+ publisher.setTCPKeepAlive(config.tcpKeepAlive);
+ publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount);
+ publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
+ publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
+ publisher.setXpubVerbose(config.xpubVerbose);
+ for (final String endpoint : config.endpoints) {
+ publisher.bind(endpoint);
+ }
+ LOGGER.debug("Created JeroMqManager with {}", config);
+ }
+
+ public boolean send(final byte[] data) {
+ return publisher.send(data);
+ }
+
+ @Override
+ protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
+ publisher.close();
+ return true;
+ }
+
+ public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
+ final boolean delayAttachOnConnect, final byte[] identity,
+ final boolean ipv4Only, final long linger, final long maxMsgSize,
+ final long rcvHwm, final long receiveBufferSize,
+ final int receiveTimeOut, final long reconnectIVL,
+ final long reconnectIVLMax, final long sendBufferSize,
+ final int sendTimeOut, final long sndHwm, final int tcpKeepAlive,
+ final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
+ final long tcpKeepAliveInterval, final boolean xpubVerbose,
+ final List<String> endpoints) {
+ return getManager(name, FACTORY,
+ new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize,
+ rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut,
+ sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose,
+ endpoints));
+ }
+
+ public static ZMQ.Context getContext() {
+ return CONTEXT;
+ }
+
+ private static class JeroMqConfiguration {
+ private final long affinity;
+ private final long backlog;
+ private final boolean delayAttachOnConnect;
+ private final byte[] identity;
+ private final boolean ipv4Only;
+ private final long linger;
+ private final long maxMsgSize;
+ private final long rcvHwm;
+ private final long receiveBufferSize;
+ private final int receiveTimeOut;
+ private final long reconnectIVL;
+ private final long reconnectIVLMax;
+ private final long sendBufferSize;
+ private final int sendTimeOut;
+ private final long sndHwm;
+ private final int tcpKeepAlive;
+ private final long tcpKeepAliveCount;
+ private final long tcpKeepAliveIdle;
+ private final long tcpKeepAliveInterval;
+ private final boolean xpubVerbose;
+ private final List<String> endpoints;
+
+ private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect,
+ final byte[] identity, final boolean ipv4Only, final long linger,
+ final long maxMsgSize, final long rcvHwm, final long receiveBufferSize,
+ final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax,
+ final long sendBufferSize, final int sendTimeOut, final long sndHwm,
+ final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
+ final long tcpKeepAliveInterval, final boolean xpubVerbose,
+ final List<String> endpoints) {
+ this.affinity = affinity;
+ this.backlog = backlog;
+ this.delayAttachOnConnect = delayAttachOnConnect;
+ this.identity = identity;
+ this.ipv4Only = ipv4Only;
+ this.linger = linger;
+ this.maxMsgSize = maxMsgSize;
+ this.rcvHwm = rcvHwm;
+ this.receiveBufferSize = receiveBufferSize;
+ this.receiveTimeOut = receiveTimeOut;
+ this.reconnectIVL = reconnectIVL;
+ this.reconnectIVLMax = reconnectIVLMax;
+ this.sendBufferSize = sendBufferSize;
+ this.sendTimeOut = sendTimeOut;
+ this.sndHwm = sndHwm;
+ this.tcpKeepAlive = tcpKeepAlive;
+ this.tcpKeepAliveCount = tcpKeepAliveCount;
+ this.tcpKeepAliveIdle = tcpKeepAliveIdle;
+ this.tcpKeepAliveInterval = tcpKeepAliveInterval;
+ this.xpubVerbose = xpubVerbose;
+ this.endpoints = endpoints;
+ }
+
+ @Override
+ public String toString() {
+ return "JeroMqConfiguration{" +
+ "affinity=" + affinity +
+ ", backlog=" + backlog +
+ ", delayAttachOnConnect=" + delayAttachOnConnect +
+ ", identity=" + Arrays.toString(identity) +
+ ", ipv4Only=" + ipv4Only +
+ ", linger=" + linger +
+ ", maxMsgSize=" + maxMsgSize +
+ ", rcvHwm=" + rcvHwm +
+ ", receiveBufferSize=" + receiveBufferSize +
+ ", receiveTimeOut=" + receiveTimeOut +
+ ", reconnectIVL=" + reconnectIVL +
+ ", reconnectIVLMax=" + reconnectIVLMax +
+ ", sendBufferSize=" + sendBufferSize +
+ ", sendTimeOut=" + sendTimeOut +
+ ", sndHwm=" + sndHwm +
+ ", tcpKeepAlive=" + tcpKeepAlive +
+ ", tcpKeepAliveCount=" + tcpKeepAliveCount +
+ ", tcpKeepAliveIdle=" + tcpKeepAliveIdle +
+ ", tcpKeepAliveInterval=" + tcpKeepAliveInterval +
+ ", xpubVerbose=" + xpubVerbose +
+ ", endpoints=" + endpoints +
+ '}';
+ }
+ }
+
+ private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> {
+ @Override
+ public JeroMqManager createManager(final String name, final JeroMqConfiguration data) {
+ return new JeroMqManager(name, data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
index bee9437..787f655 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -1,120 +1,120 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-
-package org.apache.logging.log4j.core.appender.mom.kafka;
-
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.core.Appender;
-import org.apache.logging.log4j.core.Filter;
-import org.apache.logging.log4j.core.Layout;
-import org.apache.logging.log4j.core.LogEvent;
-import org.apache.logging.log4j.core.appender.AbstractAppender;
-import org.apache.logging.log4j.core.appender.AppenderLoggingException;
-import org.apache.logging.log4j.core.config.Configuration;
-import org.apache.logging.log4j.core.config.Node;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.config.plugins.Plugin;
-import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
-import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
-import org.apache.logging.log4j.core.config.plugins.PluginElement;
-import org.apache.logging.log4j.core.config.plugins.PluginFactory;
-import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
-import org.apache.logging.log4j.core.layout.SerializedLayout;
-import org.apache.logging.log4j.core.util.StringEncoder;
-
-/**
- * Sends log events to an Apache Kafka topic.
- */
-@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
-public final class KafkaAppender extends AbstractAppender {
-
- @PluginFactory
- public static KafkaAppender createAppender(
- @PluginElement("Layout") final Layout<? extends Serializable> layout,
- @PluginElement("Filter") final Filter filter,
- @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
- @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
- @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
- @PluginElement("Properties") final Property[] properties,
- @PluginConfiguration final Configuration configuration) {
- final KafkaManager kafkaManager = new KafkaManager(configuration.getLoggerContext(), name, topic, properties);
- return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
- }
-
- private final KafkaManager manager;
-
- private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
- super(name, filter, layout, ignoreExceptions);
- this.manager = manager;
- }
-
- @Override
- public void append(final LogEvent event) {
- if (event.getLoggerName().startsWith("org.apache.kafka")) {
- LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
- } else {
- try {
- final Layout<? extends Serializable> layout = getLayout();
- byte[] data;
- if (layout != null) {
- if (layout instanceof SerializedLayout) {
- final byte[] header = layout.getHeader();
- final byte[] body = layout.toByteArray(event);
- data = new byte[header.length + body.length];
- System.arraycopy(header, 0, data, 0, header.length);
- System.arraycopy(body, 0, data, header.length, body.length);
- } else {
- data = layout.toByteArray(event);
- }
- } else {
- data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
- }
- manager.send(data);
- } catch (final Exception e) {
- LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
- throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
- }
- }
- }
-
- @Override
- public void start() {
- super.start();
- manager.startup();
- }
-
- @Override
- public boolean stop(final long timeout, final TimeUnit timeUnit) {
- setStopping();
- super.stop(timeout, timeUnit, false);
- manager.stop(timeout, timeUnit);
- setStopped();
- return true;
- }
-
- @Override
- public String toString() {
- return "KafkaAppender{" +
- "name=" + getName() +
- ", state=" + getState() +
- ", topic=" + manager.getTopic() +
- '}';
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.appender.AppenderLoggingException;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+import org.apache.logging.log4j.core.layout.SerializedLayout;
+import org.apache.logging.log4j.core.util.StringEncoder;
+
+/**
+ * Sends log events to an Apache Kafka topic.
+ */
+@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
+public final class KafkaAppender extends AbstractAppender {
+
+ @PluginFactory
+ public static KafkaAppender createAppender(
+ @PluginElement("Layout") final Layout<? extends Serializable> layout,
+ @PluginElement("Filter") final Filter filter,
+ @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
+ @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
+ @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
+ @PluginElement("Properties") final Property[] properties,
+ @PluginConfiguration final Configuration configuration) {
+ final KafkaManager kafkaManager = new KafkaManager(configuration.getLoggerContext(), name, topic, properties);
+ return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
+ }
+
+ private final KafkaManager manager;
+
+ private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
+ super(name, filter, layout, ignoreExceptions);
+ this.manager = manager;
+ }
+
+ @Override
+ public void append(final LogEvent event) {
+ if (event.getLoggerName().startsWith("org.apache.kafka")) {
+ LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
+ } else {
+ try {
+ final Layout<? extends Serializable> layout = getLayout();
+ byte[] data;
+ if (layout != null) {
+ if (layout instanceof SerializedLayout) {
+ final byte[] header = layout.getHeader();
+ final byte[] body = layout.toByteArray(event);
+ data = new byte[header.length + body.length];
+ System.arraycopy(header, 0, data, 0, header.length);
+ System.arraycopy(body, 0, data, header.length, body.length);
+ } else {
+ data = layout.toByteArray(event);
+ }
+ } else {
+ data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
+ }
+ manager.send(data);
+ } catch (final Exception e) {
+ LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
+ throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ manager.startup();
+ }
+
+ @Override
+ public boolean stop(final long timeout, final TimeUnit timeUnit) {
+ setStopping();
+ boolean stopped = super.stop(timeout, timeUnit, false);
+ stopped &= manager.stop(timeout, timeUnit);
+ setStopped();
+ return stopped;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaAppender{" +
+ "name=" + getName() +
+ ", state=" + getState() +
+ ", topic=" + manager.getTopic() +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index d991d7e..c5347a2 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -1,79 +1,80 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-
-package org.apache.logging.log4j.core.appender.mom.kafka;
-
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.appender.AbstractManager;
-import org.apache.logging.log4j.core.config.Property;
-
-public class KafkaManager extends AbstractManager {
-
- public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
-
- /**
- * package-private access for testing.
- */
- static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
-
- private final Properties config = new Properties();
- private Producer<byte[], byte[]> producer;
- private final int timeoutMillis;
-
- private final String topic;
-
- public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final Property[] properties) {
- super(loggerContext, name);
- this.topic = topic;
- config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- config.setProperty("batch.size", "0");
- for (final Property property : properties) {
- config.setProperty(property.getName(), property.getValue());
- }
- this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
- }
-
- @Override
- public void releaseSub(final long timeout, final TimeUnit timeUnit) {
- if (producer != null) {
- producer.close(timeout, timeUnit);
- }
- }
-
- public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
- if (producer != null) {
- producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
- }
- }
-
- public void startup() {
- producer = producerFactory.newKafkaProducer(config);
- }
-
- public String getTopic() {
- return topic;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.config.Property;
+
+public class KafkaManager extends AbstractManager {
+
+ public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
+
+ /**
+ * package-private access for testing.
+ */
+ static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
+
+ private final Properties config = new Properties();
+ private Producer<byte[], byte[]> producer;
+ private final int timeoutMillis;
+
+ private final String topic;
+
+ public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final Property[] properties) {
+ super(loggerContext, name);
+ this.topic = topic;
+ config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ config.setProperty("batch.size", "0");
+ for (final Property property : properties) {
+ config.setProperty(property.getName(), property.getValue());
+ }
+ this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
+ }
+
+ @Override
+ public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
+ if (producer != null) {
+ producer.close(timeout, timeUnit);
+ }
+ return true;
+ }
+
+ public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+ if (producer != null) {
+ producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void startup() {
+ producer = producerFactory.newKafkaProducer(config);
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+}