You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/08/14 22:35:48 UTC
svn commit: r1373067 -
/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
Author: jkreps
Date: Tue Aug 14 20:35:48 2012
New Revision: 1373067
URL: http://svn.apache.org/viewvc?rev=1373067&view=rev
Log:
KAFKA-323 Add more parameters to the log4j appender. Patch by Jose Quinteiro.
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1373067&r1=1373066&r2=1373067&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Tue Aug 14 20:35:48 2012
@@ -28,7 +28,11 @@ class KafkaLog4jAppender extends Appende
var topic:String = null
var serializerClass:String = null
var brokerList:String = null
-
+ var producerType:String = null
+ var compressionCodec:String = null
+ var enqueueTimeout:String = null
+ var queueSize:String = null
+
private var producer: Producer[String, String] = null
def getTopic:String = topic
@@ -40,6 +44,18 @@ class KafkaLog4jAppender extends Appende
def getSerializerClass:String = serializerClass
def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass }
+ def getProducerType:String = producerType
+ def setProducerType(producerType:String) { this.producerType = producerType }
+
+ def getCompressionCodec:String = compressionCodec
+ def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec }
+
+ def getEnqueueTimeout:String = enqueueTimeout
+ def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout }
+
+ def getQueueSize:String = queueSize
+ def setQueueSize(queueSize:String) { this.queueSize = queueSize }
+
override def activateOptions() {
// check for config parameter validity
val props = new Properties()
@@ -54,6 +70,11 @@ class KafkaLog4jAppender extends Appende
LogLog.debug("Using default encoder - kafka.serializer.StringEncoder")
}
props.put("serializer.class", serializerClass)
+ //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified
+ if(producerType != null) props.put("producer.type", producerType)
+ if(compressionCodec != null) props.put("compression.codec", compressionCodec)
+ if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout)
+ if(queueSize != null) props.put("queue.size", queueSize)
val config : ProducerConfig = new ProducerConfig(props)
producer = new Producer[String, String](config)
LogLog.debug("Kafka producer connected to " + config.brokerList)