You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by gg...@apache.org on 2015/08/31 19:58:14 UTC

[2/2] logging-log4j2 git commit: [LOG4J2-1107] New Appender for Apache Kafka.

[LOG4J2-1107] New Appender for Apache Kafka.

Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/53320c02
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/53320c02
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/53320c02

Branch: refs/heads/master
Commit: 53320c02916fdb72e4dfc093de855faabba6ea18
Parents: 546f4d0
Author: ggregory <gg...@apache.org>
Authored: Mon Aug 31 10:58:08 2015 -0700
Committer: ggregory <gg...@apache.org>
Committed: Mon Aug 31 10:58:08 2015 -0700

----------------------------------------------------------------------
 log4j-core/pom.xml                              |    6 +
 .../mom/kafka/DefaultKafkaProducerFactory.java  |   32 +
 .../core/appender/mom/kafka/KafkaAppender.java  |   96 +
 .../core/appender/mom/kafka/KafkaManager.java   |   88 +
 .../mom/kafka/KafkaProducerFactory.java         |   28 +
 log4j-core/src/site/xdoc/index.xml              |    1 +
 .../appender/mom/kafka/KafkaAppenderTest.java   |  101 +
 .../src/test/resources/KafkaAppenderTest.xml    |   34 +
 pom.xml                                         |    7 +-
 src/changes/changes.xml                         |    3 +
 src/site/xdoc/manual/appenders.xml              | 6502 +++++++++---------
 11 files changed, 3688 insertions(+), 3210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/pom.xml
----------------------------------------------------------------------
diff --git a/log4j-core/pom.xml b/log4j-core/pom.xml
index 79ab281..92a1023 100644
--- a/log4j-core/pom.xml
+++ b/log4j-core/pom.xml
@@ -108,6 +108,12 @@
       <scope>provided</scope>
       <optional>true</optional>
     </dependency>
+    <!-- Used for Kafka appender -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <optional>true</optional>
+    </dependency>
     <!-- Used for compressing to formats other than zip and gz -->
     <dependency>
       <groupId>org.apache.commons</groupId>

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/DefaultKafkaProducerFactory.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/DefaultKafkaProducerFactory.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/DefaultKafkaProducerFactory.java
new file mode 100644
index 0000000..88c74f4
--- /dev/null
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/DefaultKafkaProducerFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+
+public class DefaultKafkaProducerFactory implements KafkaProducerFactory {
+
+    @Override
+    public Producer<byte[], byte[]> newKafkaProducer(final Properties config) {
+        return new KafkaProducer<>(config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
new file mode 100644
index 0000000..2e6c338
--- /dev/null
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.appender.AppenderLoggingException;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+import org.apache.logging.log4j.core.util.Booleans;
+
+/**
+ * Appender to send log events to an Apache Kafka topic.
+ */
+@Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
+public final class KafkaAppender extends AbstractAppender {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    @PluginFactory
+    public static KafkaAppender createAppender(
+            @PluginElement("Layout") final Layout<? extends Serializable> layout,
+            @PluginElement("Filter") final Filter filter,
+            @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
+            @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final String ignore,
+            @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
+            @PluginElement("Properties") final Property[] properties) {
+        final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
+        final KafkaManager kafkaManager = new KafkaManager(name, topic, properties);
+        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
+    }
+
+    private final KafkaManager manager;
+
+    private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
+        super(name, filter, layout, ignoreExceptions);
+        this.manager = manager;
+    }
+
+    @Override
+    public void append(final LogEvent event) {
+        if (event.getLoggerName().startsWith("org.apache.kafka")) {
+            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
+        } else {
+            try {
+                if (getLayout() != null) {
+                    manager.send(getLayout().toByteArray(event));
+                } else {
+                    manager.send(event.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8));
+                }
+            } catch (final Exception e) {
+                LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
+                throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        manager.startup();
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+        manager.release();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
new file mode 100644
index 0000000..64797c8
--- /dev/null
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.config.Property;
+
+public class KafkaManager extends AbstractManager {
+
+    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
+
+    /**
+     * package-private access for testing.
+     */
+    static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
+
+    private final Properties config = new Properties();
+    private Producer<byte[], byte[]> producer = null;
+    private final int timeoutMillis;
+
+    private final String topic;
+
+    public KafkaManager(final String name, final String topic, final Property[] properties) {
+        super(name);
+        this.topic = topic;
+        config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        config.setProperty("batch.size", "0");
+        for (final Property property : properties) {
+            config.setProperty(property.getName(), property.getValue());
+        }
+        this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
+    }
+
+    @Override
+    public void releaseSub() {
+        if (producer != null) {
+            // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
+            final Thread closeThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    producer.close();
+                }
+            });
+            closeThread.setName("KafkaManager-CloseThread");
+            closeThread.setDaemon(true); // avoid blocking JVM shutdown
+            closeThread.start();
+            try {
+                closeThread.join(timeoutMillis);
+            } catch (final InterruptedException ignore) {
+                // ignore
+            }
+        }
+    }
+
+    public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+        if (producer != null) {
+            producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    public void startup() {
+        producer = producerFactory.newKafkaProducer(config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaProducerFactory.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaProducerFactory.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaProducerFactory.java
new file mode 100644
index 0000000..d9c56be
--- /dev/null
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaProducerFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Producer;
+
+public interface KafkaProducerFactory {
+
+    Producer<byte[], byte[]> newKafkaProducer(Properties config);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/site/xdoc/index.xml
----------------------------------------------------------------------
diff --git a/log4j-core/src/site/xdoc/index.xml b/log4j-core/src/site/xdoc/index.xml
index e01d9fc..2884cdd 100644
--- a/log4j-core/src/site/xdoc/index.xml
+++ b/log4j-core/src/site/xdoc/index.xml
@@ -48,6 +48,7 @@
                 <li>SMTPAppender requires Javax Mail.</li>
                 <li>JMSQueueAppender and JMSTopicAppender require a JMS implementation like
                 <a href="http://activemq.apache.org/">Apache ActiveMQ</a>.</li>
+                <li>Kafka appender requires <a href="http://search.maven.org/#artifactdetails|org.apache.kafka|kafka-clients|0.8.2.1|jar">Kafka client library</a></li>
                 <li>Windows color support requires <a href="http://jansi.fusesource.org/">Jansi</a>.</li>
                 <li>The JDBC Appender requires a JDBC driver for the database you choose to write events to.</li>
                 <li>The JPA Appender requires the Java Persistence API classes, a JPA provider implementation,

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
new file mode 100644
index 0000000..d9544e3
--- /dev/null
+++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.junit.LoggerContextRule;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class KafkaAppenderTest {
+
+    private static final MockProducer kafka = new MockProducer();
+
+    private static final String LOG_MESSAGE = "Hello, world!";
+    private static final String TOPIC_NAME = "kafka-topic";
+
+    private static Log4jLogEvent createLogEvent() {
+        return Log4jLogEvent.newBuilder()
+            .setLoggerName(KafkaAppenderTest.class.getName())
+            .setLoggerFqcn(KafkaAppenderTest.class.getName())
+            .setLevel(Level.INFO)
+            .setMessage(new SimpleMessage(LOG_MESSAGE))
+            .build();
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+        KafkaManager.producerFactory = new KafkaProducerFactory() {
+            @Override
+            public Producer<byte[], byte[]> newKafkaProducer(final Properties config) {
+                return kafka;
+            }
+        };
+    }
+
+    @Rule
+    public LoggerContextRule ctx = new LoggerContextRule("KafkaAppenderTest.xml");
+
+    @Before
+    public void setUp() throws Exception {
+        kafka.clear();
+    }
+
+    @Test
+    public void testAppend() throws Exception {
+        final Appender appender = ctx.getRequiredAppender("KafkaAppender");
+        appender.append(createLogEvent());
+        final List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+        assertEquals(1, history.size());
+        final ProducerRecord<byte[], byte[]> item = history.get(0);
+        assertNotNull(item);
+        assertEquals(TOPIC_NAME, item.topic());
+        assertNull(item.key());
+        assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testAppendWithLayout() throws Exception {
+        final Appender appender = ctx.getRequiredAppender("KafkaAppenderWithLayout");
+        appender.append(createLogEvent());
+        final List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+        assertEquals(1, history.size());
+        final ProducerRecord<byte[], byte[]> item = history.get(0);
+        assertNotNull(item);
+        assertEquals(TOPIC_NAME, item.topic());
+        assertNull(item.key());
+        assertEquals("[" + LOG_MESSAGE + "]", new String(item.value(), StandardCharsets.UTF_8));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/log4j-core/src/test/resources/KafkaAppenderTest.xml
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml b/log4j-core/src/test/resources/KafkaAppenderTest.xml
new file mode 100644
index 0000000..758c426
--- /dev/null
+++ b/log4j-core/src/test/resources/KafkaAppenderTest.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache license, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the license for the specific language governing permissions and
+  ~ limitations under the license.
+  -->
+<Configuration name="KafkaAppenderTest" status="OFF">
+  <Appenders>
+    <Kafka name="KafkaAppender" topic="kafka-topic">
+      <Property name="bootstrap.servers">localhost:9092</Property>
+    </Kafka>
+    <Kafka name="KafkaAppenderWithLayout" topic="kafka-topic">
+      <PatternLayout pattern="[%m]"/>
+      <Property name="bootstrap.servers">localhost:9092</Property>
+    </Kafka>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="KafkaAppender"/>
+      <AppenderRef ref="KafkaAppenderWithLayout"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 03f3362..9a74e24 100644
--- a/pom.xml
+++ b/pom.xml
@@ -559,7 +559,12 @@
           </exclusion>
         </exclusions>
       </dependency>
- 	    <dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>0.8.2.1</version>
+      </dependency>
+      <dependency>
         <groupId>javax.servlet</groupId>
         <artifactId>servlet-api</artifactId>
         <version>2.5</version>

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/53320c02/src/changes/changes.xml
----------------------------------------------------------------------
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 76d4af9..7bf9bd9 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -31,6 +31,9 @@
         Added support for Java 8 lambda expressions to lazily construct a log message only if
               the requested log level is enabled.
       </action>
+      <action issue="LOG4J2-1107" dev="ggregory" type="add" due-to="Mikael Ståldal">
+        New Appender for Apache Kafka.
+      </action>
       <action issue="LOG4J2-812" dev="rgoers" type="update">
         PatternLayout timestamp formatting performance improvement: replaced synchronized SimpleDateFormat with
         Apache Commons FastDateFormat. This and better caching resulted in a ~3-30X faster timestamp formatting.