You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/07/07 01:36:26 UTC
kafka git commit: kafka-2132; Move Log4J appender to a separate module;
patched by Ashish Singh; reviewed by Gwen Shapira, Aditya Auradkar and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk fd612a2d5 -> 2d96da05a
kafka-2132; Move Log4J appender to a separate module; patched by Ashish Singh; reviewed by Gwen Shapira, Aditya Auradkar and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d96da05
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d96da05
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d96da05
Branch: refs/heads/trunk
Commit: 2d96da05a0af7847aca5edc6d003a18be7f5216a
Parents: fd612a2
Author: Ashish Singh <as...@cloudera.com>
Authored: Mon Jul 6 16:36:20 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jul 6 16:36:20 2015 -0700
----------------------------------------------------------------------
build.gradle | 60 +++++--
checkstyle/import-control.xml | 9 +-
.../kafka/producer/KafkaLog4jAppender.scala | 97 -----------
.../kafka/log4j/KafkaLog4jAppenderTest.scala | 143 ----------------
.../kafka/log4jappender/KafkaLog4jAppender.java | 167 +++++++++++++++++++
.../log4jappender/KafkaLog4jAppenderTest.java | 98 +++++++++++
.../log4jappender/MockKafkaLog4jAppender.java | 47 ++++++
settings.gradle | 2 +-
8 files changed, 370 insertions(+), 253 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 727d7c5..ab86987 100644
--- a/build.gradle
+++ b/build.gradle
@@ -132,7 +132,7 @@ subprojects {
archives srcJar
archives javadocJar
}
-
+
plugins.withType(ScalaPlugin) {
//source jar should also contain scala source:
srcJar.from sourceSets.main.scala
@@ -202,20 +202,20 @@ for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_6'] ) {
}
}
-tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar']) {
+tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar']) {
}
-tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar']) { }
+tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar']) { }
-tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar']) { }
+tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar']) { }
-tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test']) {
+tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test', 'log4j-appender:test']) {
}
tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_5', 'releaseTarGz_2_11_6']) {
}
-tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives']) {
+tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives']) {
}
project(':core') {
@@ -228,6 +228,7 @@ project(':core') {
dependencies {
compile project(':clients')
+ compile project(':log4j-appender')
compile "org.scala-lang:scala-library:$scalaVersion"
compile 'org.apache.zookeeper:zookeeper:3.4.6'
compile 'com.101tec:zkclient:0.5'
@@ -237,7 +238,6 @@ project(':core') {
testCompile 'junit:junit:4.6'
testCompile 'org.easymock:easymock:3.0'
testCompile 'org.objenesis:objenesis:1.2'
- testCompile project(':clients')
if (scalaVersion.startsWith('2.10')) {
testCompile 'org.scalatest:scalatest_2.10:1.9.1'
} else if (scalaVersion.startsWith('2.11')) {
@@ -273,9 +273,9 @@ project(':core') {
into "$buildDir/dependant-libs-${scalaVersion}"
}
- tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
+ tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
into "kafka_${baseScalaVersion}-${version}"
- compression = Compression.GZIP
+ compression = Compression.GZIP
from(project.file("../bin")) { into "bin/" }
from(project.file("../config")) { into "config/" }
from '../LICENSE'
@@ -378,7 +378,7 @@ project(':clients') {
compile 'org.xerial.snappy:snappy-java:1.1.1.7'
compile 'net.jpountz.lz4:lz4:1.2.0'
- testCompile 'com.novocode:junit-interface:0.9'
+ testCompile 'junit:junit:4.6'
testRuntime "$slf4jlog4j"
}
@@ -405,7 +405,45 @@ project(':clients') {
artifacts {
archives testJar
}
-
+
+ configurations {
+ archives.extendsFrom (testCompile)
+ }
+
+ checkstyle {
+ configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+ }
+ test.dependsOn('checkstyleMain', 'checkstyleTest')
+}
+
+project(':log4j-appender') {
+ apply plugin: 'checkstyle'
+ archivesBaseName = "kafka-log4j-appender"
+
+ dependencies {
+ compile project(':clients')
+ compile "$slf4jlog4j"
+
+ testCompile 'junit:junit:4.6'
+ testCompile project(path: ':clients', configuration: 'archives')
+ }
+
+ task testJar(type: Jar) {
+ classifier = 'test'
+ from sourceSets.test.output
+ }
+
+ test {
+ testLogging {
+ events "passed", "skipped", "failed"
+ exceptionFormat = 'full'
+ }
+ }
+
+ javadoc {
+ include "**/org/apache/kafka/log4jappender/*"
+ }
+
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f2e6cec..19e0659 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -95,8 +95,15 @@
</subpackage>
</subpackage>
+ <subpackage name="log4jappender">
+ <allow pkg="org.apache.log4j" />
+ <allow pkg="org.apache.kafka.clients" />
+ <allow pkg="org.apache.kafka.common" />
+ <allow pkg="org.apache.kafka.test" />
+ </subpackage>
+
<subpackage name="test">
<allow pkg="org.apache.kafka" />
</subpackage>
-
+
</import-control>
http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
deleted file mode 100644
index 5d36a01..0000000
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-import async.MissingConfigException
-import org.apache.log4j.spi.LoggingEvent
-import org.apache.log4j.AppenderSkeleton
-import org.apache.log4j.helpers.LogLog
-import kafka.utils.Logging
-import java.util.{Properties, Date}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-
-class KafkaLog4jAppender extends AppenderSkeleton with Logging {
- var topic: String = null
- var brokerList: String = null
- var compressionType: String = null
- var retries: Int = 0
- var requiredNumAcks: Int = Int.MaxValue
- var syncSend: Boolean = false
-
- private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
-
- def getTopic: String = topic
- def setTopic(topic: String) { this.topic = topic }
-
- def getBrokerList: String = brokerList
- def setBrokerList(brokerList: String) { this.brokerList = brokerList }
-
- def getCompressionType: String = compressionType
- def setCompressionType(compressionType: String) { this.compressionType = compressionType }
-
- def getRequiredNumAcks: Int = requiredNumAcks
- def setRequiredNumAcks(requiredNumAcks: Int) { this.requiredNumAcks = requiredNumAcks }
-
- def getSyncSend: Boolean = syncSend
- def setSyncSend(syncSend: Boolean) { this.syncSend = syncSend }
-
- def getRetries: Int = retries
- def setRetries(retries: Int) { this.retries = retries }
-
- override def activateOptions() {
- // check for config parameter validity
- val props = new Properties()
- if(brokerList != null)
- props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
- if(props.isEmpty)
- throw new MissingConfigException("The bootstrap servers property should be specified")
- if(topic == null)
- throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
- if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
- if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString)
- if(retries > 0) props.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, retries.toString)
- props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
- props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
- producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
- LogLog.debug("Kafka producer connected to " + brokerList)
- LogLog.debug("Logging for topic: " + topic)
- }
-
- override def append(event: LoggingEvent) {
- val message = subAppend(event)
- LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
- val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message.getBytes()))
- if (syncSend) response.get
- }
-
- def subAppend(event: LoggingEvent): String = {
- if(this.layout == null)
- event.getRenderedMessage
- else
- this.layout.format(event)
- }
-
- override def close() {
- if(!this.closed) {
- this.closed = true
- producer.close()
- }
- }
-
- override def requiresLayout: Boolean = true
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
deleted file mode 100755
index 41366a1..0000000
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log4j
-
-import kafka.consumer.SimpleConsumer
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, CoreUtils, Logging}
-import kafka.api.FetchRequestBuilder
-import kafka.producer.async.MissingConfigException
-import kafka.serializer.Encoder
-import kafka.zk.ZooKeeperTestHarness
-
-import java.util.Properties
-import java.io.File
-
-import org.apache.log4j.spi.LoggingEvent
-import org.apache.log4j.{PropertyConfigurator, Logger}
-import org.junit.{After, Before, Test}
-import org.scalatest.junit.JUnit3Suite
-
-import junit.framework.Assert._
-
-class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
-
- var logDirZk: File = null
- var config: KafkaConfig = null
- var server: KafkaServer = null
-
- var simpleConsumerZk: SimpleConsumer = null
-
- val tLogger = Logger.getLogger(getClass())
-
- private val brokerZk = 0
-
- @Before
- override def setUp() {
- super.setUp()
-
- val propsZk = TestUtils.createBrokerConfig(brokerZk, zkConnect)
- val logDirZkPath = propsZk.getProperty("log.dir")
- logDirZk = new File(logDirZkPath)
- config = KafkaConfig.fromProps(propsZk)
- server = TestUtils.createServer(config)
- simpleConsumerZk = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64 * 1024, "")
- }
-
- @After
- override def tearDown() {
- simpleConsumerZk.close
- server.shutdown
- CoreUtils.rm(logDirZk)
- super.tearDown()
- }
-
- @Test
- def testKafkaLog4jConfigs() {
- // host missing
- var props = new Properties()
- props.put("log4j.rootLogger", "INFO")
- props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
- props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
- props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
- props.put("log4j.appender.KAFKA.Topic", "test-topic")
- props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
-
- try {
- PropertyConfigurator.configure(props)
- fail("Missing properties exception was expected !")
- } catch {
- case e: MissingConfigException =>
- }
-
- // topic missing
- props = new Properties()
- props.put("log4j.rootLogger", "INFO")
- props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
- props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
- props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
- props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromServers(Seq(server)))
- props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
-
- try {
- PropertyConfigurator.configure(props)
- fail("Missing properties exception was expected !")
- } catch {
- case e: MissingConfigException =>
- }
- }
-
- @Test
- def testLog4jAppends() {
- PropertyConfigurator.configure(getLog4jConfig)
-
- for(i <- 1 to 5)
- info("test")
-
- val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build())
- val fetchMessage = response.messageSet("test-topic", 0)
-
- var count = 0
- for(message <- fetchMessage) {
- count = count + 1
- }
-
- assertEquals(5, count)
- }
-
- private def getLog4jConfig: Properties = {
- val props = new Properties()
- props.put("log4j.rootLogger", "INFO")
- props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
- props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
- props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
- props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromServers(Seq(server)))
- props.put("log4j.appender.KAFKA.Topic", "test-topic")
- props.put("log4j.appender.KAFKA.RequiredNumAcks", "1")
- props.put("log4j.appender.KAFKA.SyncSend", "true")
- props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
- props
- }
-}
-
-class AppenderStringEncoder(encoding: String = "UTF-8") extends Encoder[LoggingEvent] {
- def toBytes(event: LoggingEvent): Array[Byte] = {
- event.getMessage.toString.getBytes(encoding)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
----------------------------------------------------------------------
diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
new file mode 100644
index 0000000..628ff53
--- /dev/null
+++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.log4jappender;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.util.Date;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * A log4j appender that produces log messages to Kafka
+ */
+public class KafkaLog4jAppender extends AppenderSkeleton {
+
+ private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+ private static final String COMPRESSION_TYPE_CONFIG = "compression.type";
+ private static final String ACKS_CONFIG = "acks";
+ private static final String RETRIES_CONFIG = "retries";
+ private static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
+ private static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
+
+ private String brokerList = null;
+ private String topic = null;
+ private String compressionType = null;
+
+ private int retries = 0;
+ private int requiredNumAcks = Integer.MAX_VALUE;
+ private boolean syncSend = false;
+ private Producer<byte[], byte[]> producer = null;
+
+ public Producer<byte[], byte[]> getProducer() {
+ return producer;
+ }
+
+ public String getBrokerList() {
+ return brokerList;
+ }
+
+ public void setBrokerList(String brokerList) {
+ this.brokerList = brokerList;
+ }
+
+ public int getRequiredNumAcks() {
+ return requiredNumAcks;
+ }
+
+ public void setRequiredNumAcks(int requiredNumAcks) {
+ this.requiredNumAcks = requiredNumAcks;
+ }
+
+ public int getRetries() {
+ return retries;
+ }
+
+ public void setRetries(int retries) {
+ this.retries = retries;
+ }
+
+ public String getCompressionType() {
+ return compressionType;
+ }
+
+ public void setCompressionType(String compressionType) {
+ this.compressionType = compressionType;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public boolean getSyncSend() {
+ return syncSend;
+ }
+
+ public void setSyncSend(boolean syncSend) {
+ this.syncSend = syncSend;
+ }
+
+ @Override
+ public void activateOptions() {
+ // check for config parameter validity
+ Properties props = new Properties();
+ if (brokerList != null)
+ props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ if (props.isEmpty())
+ throw new ConfigException("The bootstrap servers property should be specified");
+ if (topic == null)
+ throw new ConfigException("Topic must be specified by the Kafka log4j appender");
+ if (compressionType != null)
+ props.put(COMPRESSION_TYPE_CONFIG, compressionType);
+ if (requiredNumAcks != Integer.MAX_VALUE)
+ props.put(ACKS_CONFIG, requiredNumAcks);
+ if (retries > 0)
+ props.put(RETRIES_CONFIG, retries);
+
+ props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ this.producer = getKafkaProducer(props);
+ LogLog.debug("Kafka producer connected to " + brokerList);
+ LogLog.debug("Logging for topic: " + topic);
+ }
+
+ protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
+ return new KafkaProducer<byte[], byte[]>(props);
+ }
+
+ @Override
+ protected void append(LoggingEvent event) {
+ String message = subAppend(event);
+ LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
+ Future<RecordMetadata> response = producer.send(new ProducerRecord<byte[], byte[]>(topic, message.getBytes()));
+ if (syncSend) {
+ try {
+ response.get();
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ } catch (ExecutionException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ private String subAppend(LoggingEvent event) {
+ return (this.layout == null) ? event.getRenderedMessage() : this.layout.format(event);
+ }
+
+ @Override
+ public void close() {
+ if (!this.closed) {
+ this.closed = true;
+ producer.close();
+ }
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
----------------------------------------------------------------------
diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
new file mode 100644
index 0000000..71bdd94
--- /dev/null
+++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.log4jappender;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
+
+public class KafkaLog4jAppenderTest {
+
+ Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class);
+
+ @Test
+ public void testKafkaLog4jConfigs() {
+ // host missing
+ Properties props = new Properties();
+ props.put("log4j.rootLogger", "INFO");
+ props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
+ props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
+ props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
+ props.put("log4j.appender.KAFKA.Topic", "test-topic");
+ props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
+
+ try {
+ PropertyConfigurator.configure(props);
+ Assert.fail("Missing properties exception was expected !");
+ } catch (ConfigException ex) {
+ // It's OK!
+ }
+
+ // topic missing
+ props = new Properties();
+ props.put("log4j.rootLogger", "INFO");
+ props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
+ props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
+ props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
+ props.put("log4j.appender.KAFKA.brokerList", "127.0.0.1:9093");
+ props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
+
+ try {
+ PropertyConfigurator.configure(props);
+ Assert.fail("Missing properties exception was expected !");
+ } catch (ConfigException ex) {
+ // It's OK!
+ }
+ }
+
+
+ @Test
+ public void testLog4jAppends() throws UnsupportedEncodingException {
+ PropertyConfigurator.configure(getLog4jConfig());
+
+ for (int i = 1; i <= 5; ++i) {
+ logger.error(getMessage(i));
+ }
+
+ Assert.assertEquals(
+ 5, ((MockKafkaLog4jAppender) (logger.getRootLogger().getAppender("KAFKA"))).getHistory().size());
+ }
+
+ private byte[] getMessage(int i) throws UnsupportedEncodingException {
+ return ("test_" + i).getBytes("UTF-8");
+ }
+
+ private Properties getLog4jConfig() {
+ Properties props = new Properties();
+ props.put("log4j.rootLogger", "INFO, KAFKA");
+ props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.MockKafkaLog4jAppender");
+ props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
+ props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
+ props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.1:9093");
+ props.put("log4j.appender.KAFKA.Topic", "test-topic");
+ props.put("log4j.appender.KAFKA.RequiredNumAcks", "1");
+ props.put("log4j.appender.KAFKA.SyncSend", "false");
+ props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
+ return props;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
----------------------------------------------------------------------
diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
new file mode 100644
index 0000000..c35f26a
--- /dev/null
+++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.log4jappender;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.test.MockSerializer;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.util.Properties;
+
+public class MockKafkaLog4jAppender extends KafkaLog4jAppender {
+ private MockProducer<byte[], byte[]> mockProducer =
+ new MockProducer<byte[], byte[]>(false, new MockSerializer(), new MockSerializer());
+
+ @Override
+ protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
+ return mockProducer;
+ }
+
+ @Override
+ protected void append(LoggingEvent event) {
+ if (super.getProducer() == null) {
+ activateOptions();
+ }
+ super.append(event);
+ }
+
+ protected java.util.List<ProducerRecord<byte[], byte[]>> getHistory() {
+ return mockProducer.history();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 83f764e..3b6a952 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,4 +14,4 @@
// limitations under the License.
apply from: file('scala.gradle')
-include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients'
+include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j-appender'