You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/08/14 22:35:48 UTC

svn commit: r1373067 - /incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala

Author: jkreps
Date: Tue Aug 14 20:35:48 2012
New Revision: 1373067

URL: http://svn.apache.org/viewvc?rev=1373067&view=rev
Log:
KAFKA-323 Add more parameters to the log4j appender. Patch by Jose Quinteiro.


Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1373067&r1=1373066&r2=1373067&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Tue Aug 14 20:35:48 2012
@@ -28,7 +28,11 @@ class KafkaLog4jAppender extends Appende
   var topic:String = null
   var serializerClass:String = null
   var brokerList:String = null
-  
+  var producerType:String = null
+  var compressionCodec:String = null
+  var enqueueTimeout:String = null
+  var queueSize:String = null
+
   private var producer: Producer[String, String] = null
 
   def getTopic:String = topic
@@ -40,6 +44,18 @@ class KafkaLog4jAppender extends Appende
   def getSerializerClass:String = serializerClass
   def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass }
 
+  def getProducerType:String = producerType
+  def setProducerType(producerType:String) { this.producerType = producerType }
+  
+  def getCompressionCodec:String = compressionCodec
+  def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec }
+  
+  def getEnqueueTimeout:String = enqueueTimeout
+  def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout }
+
+  def getQueueSize:String = queueSize
+  def setQueueSize(queueSize:String) { this.queueSize = queueSize }
+
   override def activateOptions() {
     // check for config parameter validity
     val props = new Properties()
@@ -54,6 +70,11 @@ class KafkaLog4jAppender extends Appende
       LogLog.debug("Using default encoder - kafka.serializer.StringEncoder")
     }
     props.put("serializer.class", serializerClass)
+    //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified
+    if(producerType != null) props.put("producer.type", producerType)
+    if(compressionCodec != null) props.put("compression.codec", compressionCodec)
+    if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout)
+    if(queueSize != null) props.put("queue.size", queueSize)
     val config : ProducerConfig = new ProducerConfig(props)
     producer = new Producer[String, String](config)
     LogLog.debug("Kafka producer connected to " +  config.brokerList)