You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by gg...@apache.org on 2015/08/31 19:58:14 UTC
[2/2] logging-log4j2 git commit: [LOG4J2-1107] New Appender for
Apache Kafka.
[LOG4J2-1107] New Appender for Apache Kafka.
Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/53320c02
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/53320c02
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/53320c02
Branch: refs/heads/master
Commit: 53320c02916fdb72e4dfc093de855faabba6ea18
Parents: 546f4d0
Author: ggregory <gg...@apache.org>
Authored: Mon Aug 31 10:58:08 2015 -0700
Committer: ggregory <gg...@apache.org>
Committed: Mon Aug 31 10:58:08 2015 -0700
----------------------------------------------------------------------
log4j-core/pom.xml | 6 +
.../mom/kafka/DefaultKafkaProducerFactory.java | 32 +
.../core/appender/mom/kafka/KafkaAppender.java | 96 +
.../core/appender/mom/kafka/KafkaManager.java | 88 +
.../mom/kafka/KafkaProducerFactory.java | 28 +
log4j-core/src/site/xdoc/index.xml | 1 +
.../appender/mom/kafka/KafkaAppenderTest.java | 101 +
.../src/test/resources/KafkaAppenderTest.xml | 34 +
pom.xml | 7 +-
src/changes/changes.xml | 3 +
src/site/xdoc/manual/appenders.xml | 6502 +++++++++---------
11 files changed, 3688 insertions(+), 3210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/pom.xml
----------------------------------------------------------------------
diff --git a/log4j-core/pom.xml b/log4j-core/pom.xml
index 79ab281..92a1023 100644
--- a/log4j-core/pom.xml
+++ b/log4j-core/pom.xml
@@ -108,6 +108,12 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>
+ <!-- Used for Kafka appender -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <optional>true</optional>
+ </dependency>
<!-- Used for compressing to formats other than zip and gz -->
<dependency>
<groupId>org.apache.commons</groupId>
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/DefaultKafkaProducerFactory.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/DefaultKafkaProducerFactory.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/DefaultKafkaProducerFactory.java
new file mode 100644
index 0000000..88c74f4
--- /dev/null
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/DefaultKafkaProducerFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+
+public class DefaultKafkaProducerFactory implements KafkaProducerFactory {
+
+ @Override
+ public Producer<byte[], byte[]> newKafkaProducer(final Properties config) {
+ return new KafkaProducer<>(config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
new file mode 100644
index 0000000..2e6c338
--- /dev/null
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.appender.AppenderLoggingException;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+import org.apache.logging.log4j.core.util.Booleans;
+
+/**
+ * Appender to send log events to an Apache Kafka topic.
+ */
+@Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
+public final class KafkaAppender extends AbstractAppender {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ @PluginFactory
+ public static KafkaAppender createAppender(
+ @PluginElement("Layout") final Layout<? extends Serializable> layout,
+ @PluginElement("Filter") final Filter filter,
+ @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
+ @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final String ignore,
+ @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
+ @PluginElement("Properties") final Property[] properties) {
+ final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
+ final KafkaManager kafkaManager = new KafkaManager(name, topic, properties);
+ return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
+ }
+
+ private final KafkaManager manager;
+
+ private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
+ super(name, filter, layout, ignoreExceptions);
+ this.manager = manager;
+ }
+
+ @Override
+ public void append(final LogEvent event) {
+ if (event.getLoggerName().startsWith("org.apache.kafka")) {
+ LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
+ } else {
+ try {
+ if (getLayout() != null) {
+ manager.send(getLayout().toByteArray(event));
+ } else {
+ manager.send(event.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8));
+ }
+ } catch (final Exception e) {
+ LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
+ throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ manager.startup();
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ manager.release();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
new file mode 100644
index 0000000..64797c8
--- /dev/null
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.config.Property;
+
+public class KafkaManager extends AbstractManager {
+
+ public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
+
+ /**
+ * package-private access for testing.
+ */
+ static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
+
+ private final Properties config = new Properties();
+ private Producer<byte[], byte[]> producer = null;
+ private final int timeoutMillis;
+
+ private final String topic;
+
+ public KafkaManager(final String name, final String topic, final Property[] properties) {
+ super(name);
+ this.topic = topic;
+ config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ config.setProperty("batch.size", "0");
+ for (final Property property : properties) {
+ config.setProperty(property.getName(), property.getValue());
+ }
+ this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
+ }
+
+ @Override
+ public void releaseSub() {
+ if (producer != null) {
+ // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
+ final Thread closeThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ producer.close();
+ }
+ });
+ closeThread.setName("KafkaManager-CloseThread");
+ closeThread.setDaemon(true); // avoid blocking JVM shutdown
+ closeThread.start();
+ try {
+ closeThread.join(timeoutMillis);
+ } catch (final InterruptedException ignore) {
+ // ignore
+ }
+ }
+ }
+
+ public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+ if (producer != null) {
+ producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void startup() {
+ producer = producerFactory.newKafkaProducer(config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaProducerFactory.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaProducerFactory.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaProducerFactory.java
new file mode 100644
index 0000000..d9c56be
--- /dev/null
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaProducerFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Producer;
+
+public interface KafkaProducerFactory {
+
+ Producer<byte[], byte[]> newKafkaProducer(Properties config);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/site/xdoc/index.xml
----------------------------------------------------------------------
diff --git a/log4j-core/src/site/xdoc/index.xml b/log4j-core/src/site/xdoc/index.xml
index e01d9fc..2884cdd 100644
--- a/log4j-core/src/site/xdoc/index.xml
+++ b/log4j-core/src/site/xdoc/index.xml
@@ -48,6 +48,7 @@
<li>SMTPAppender requires Javax Mail.</li>
<li>JMSQueueAppender and JMSTopicAppender require a JMS implementation like
<a href="http://activemq.apache.org/">Apache ActiveMQ</a>.</li>
+ <li>Kafka appender requires <a href="http://search.maven.org/#artifactdetails|org.apache.kafka|kafka-clients|0.8.2.1|jar">Kafka client library</a></li>
<li>Windows color support requires <a href="http://jansi.fusesource.org/">Jansi</a>.</li>
<li>The JDBC Appender requires a JDBC driver for the database you choose to write events to.</li>
<li>The JPA Appender requires the Java Persistence API classes, a JPA provider implementation,
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
new file mode 100644
index 0000000..d9544e3
--- /dev/null
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.junit.LoggerContextRule;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class KafkaAppenderTest {
+
+ private static final MockProducer kafka = new MockProducer();
+
+ private static final String LOG_MESSAGE = "Hello, world!";
+ private static final String TOPIC_NAME = "kafka-topic";
+
+ private static Log4jLogEvent createLogEvent() {
+ return Log4jLogEvent.newBuilder()
+ .setLoggerName(KafkaAppenderTest.class.getName())
+ .setLoggerFqcn(KafkaAppenderTest.class.getName())
+ .setLevel(Level.INFO)
+ .setMessage(new SimpleMessage(LOG_MESSAGE))
+ .build();
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ KafkaManager.producerFactory = new KafkaProducerFactory() {
+ @Override
+ public Producer<byte[], byte[]> newKafkaProducer(final Properties config) {
+ return kafka;
+ }
+ };
+ }
+
+ @Rule
+ public LoggerContextRule ctx = new LoggerContextRule("KafkaAppenderTest.xml");
+
+ @Before
+ public void setUp() throws Exception {
+ kafka.clear();
+ }
+
+ @Test
+ public void testAppend() throws Exception {
+ final Appender appender = ctx.getRequiredAppender("KafkaAppender");
+ appender.append(createLogEvent());
+ final List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+ assertEquals(1, history.size());
+ final ProducerRecord<byte[], byte[]> item = history.get(0);
+ assertNotNull(item);
+ assertEquals(TOPIC_NAME, item.topic());
+ assertNull(item.key());
+ assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testAppendWithLayout() throws Exception {
+ final Appender appender = ctx.getRequiredAppender("KafkaAppenderWithLayout");
+ appender.append(createLogEvent());
+ final List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+ assertEquals(1, history.size());
+ final ProducerRecord<byte[], byte[]> item = history.get(0);
+ assertNotNull(item);
+ assertEquals(TOPIC_NAME, item.topic());
+ assertNull(item.key());
+ assertEquals("[" + LOG_MESSAGE + "]", new String(item.value(), StandardCharsets.UTF_8));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/test/resources/KafkaAppenderTest.xml
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml b/log4j-core/src/test/resources/KafkaAppenderTest.xml
new file mode 100644
index 0000000..758c426
--- /dev/null
+++ b/log4j-core/src/test/resources/KafkaAppenderTest.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache license, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the license for the specific language governing permissions and
+ ~ limitations under the license.
+ -->
+<Configuration name="KafkaAppenderTest" status="OFF">
+ <Appenders>
+ <Kafka name="KafkaAppender" topic="kafka-topic">
+ <Property name="bootstrap.servers">localhost:9092</Property>
+ </Kafka>
+ <Kafka name="KafkaAppenderWithLayout" topic="kafka-topic">
+ <PatternLayout pattern="[%m]"/>
+ <Property name="bootstrap.servers">localhost:9092</Property>
+ </Kafka>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="KafkaAppender"/>
+ <AppenderRef ref="KafkaAppenderWithLayout"/>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 03f3362..9a74e24 100644
--- a/pom.xml
+++ b/pom.xml
@@ -559,7 +559,12 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.8.2.1</version>
+ </dependency>
+ <dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/src/changes/changes.xml
----------------------------------------------------------------------
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 76d4af9..7bf9bd9 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -31,6 +31,9 @@
Added support for Java 8 lambda expressions to lazily construct a log message only if
the requested log level is enabled.
</action>
+ <action issue="LOG4J2-1107" dev="ggregory" type="add" due-to="Mikael Ståldal">
+ New Appender for Apache Kafka.
+ </action>
<action issue="LOG4J2-812" dev="rgoers" type="update">
PatternLayout timestamp formatting performance improvement: replaced synchronized SimpleDateFormat with
Apache Commons FastDateFormat. This and better caching resulted in a ~3-30X faster timestamp formatting.