You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/02/14 19:00:15 UTC

git commit: Kafka Appender causes Log4j Deadlock; patched by David Arthur; reviewed by Jun Rao; kafka-524

Updated Branches:
  refs/heads/0.7 a475d7c43 -> 4d7629dd7


Kafka Appender causes Log4j Deadlock; patched by David Arthur; reviewed by Jun Rao; kafka-524


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4d7629dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4d7629dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4d7629dd

Branch: refs/heads/0.7
Commit: 4d7629dd7f416f94bd148b815518eaa60531e299
Parents: a475d7c
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Feb 14 09:59:55 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Feb 14 09:59:55 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/producer/KafkaLog4jAppender.scala  |   39 ++++++++++++---
 .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala  |    1 +
 2 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4d7629dd/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index 417da27..20c49d4 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -31,15 +31,25 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   var topic:String = null
   var serializerClass:String = null
   var zkConnect:String = null
+  var zkConnectTimeout:String = null
+  var zkSessionTimeout:String = null
   var brokerList:String = null
   
   private var producer: Producer[String, String] = null
+  private var config : ProducerConfig = null
+  @volatile private var initialized:Boolean = false
 
   def getTopic:String = topic
   def setTopic(topic: String) { this.topic = topic }
 
   def getZkConnect:String = zkConnect
   def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect }
+
+  def getZkConnectTimeout:String = zkConnectTimeout
+  def setZkConnectTimeout(zkConnectTimeout: String) { this.zkConnectTimeout = zkConnectTimeout }
+
+  def getZkSessionTimeout:String = zkSessionTimeout
+  def setZkSessionTimeout(zkSessionTimeout: String) { this.zkSessionTimeout = zkSessionTimeout }
   
   def getBrokerList:String = brokerList
   def setBrokerList(brokerList: String) { this.brokerList = brokerList }
@@ -51,8 +61,17 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
     val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer();
     // check for config parameter validity
     val props = new Properties()
-    if( zkConnect == null) connectDiagnostic += "zkConnect"
-    else props.put("zk.connect", zkConnect);
+
+    if(zkConnect == null)
+      connectDiagnostic += "zkConnect"
+    else {
+      props.put("zk.connect", zkConnect)
+      if(zkConnectTimeout != null)
+        props.put("zk.connectiontimeout.ms", zkConnectTimeout)
+      if(zkSessionTimeout != null)
+        props.put("zk.sessiontimeout.ms", zkSessionTimeout)
+    }
+
     if( brokerList == null) connectDiagnostic += "brokerList"
     else if( props.isEmpty) props.put("broker.list", brokerList)
     if(props.isEmpty )
@@ -66,13 +85,18 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
       LogLog.warn("Using default encoder - kafka.serializer.StringEncoder")
     }
     props.put("serializer.class", serializerClass)
-    val config : ProducerConfig = new ProducerConfig(props)
-    producer = new Producer[String, String](config)
-    LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
-    LogLog.debug("Logging for topic: " + topic)
+    config = new ProducerConfig(props)
   }
   
   override def append(event: LoggingEvent)  {
+    // AppenderSkeleton#append serialized via AppenderSkeleton#doAppend
+    // so it is safe to do this
+    if(!initialized) {
+      producer = new Producer[String, String](config)
+      LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
+      LogLog.debug("Logging for topic: " + topic)
+      initialized = true
+    }
     val message : String = if( this.layout == null) {
       event.getRenderedMessage
     }
@@ -86,7 +110,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   override def close() {
     if(!this.closed) {
       this.closed = true
-      producer.close()
+      if(initialized)
+        producer.close()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d7629dd/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index 7f67eb3..1e93a9f 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -220,6 +220,7 @@ class KafkaLog4jAppenderTest extends JUnitSuite with Logging {
     props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
     props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
+    props.put("log4j.appender.KAFKA.ZkConnectTimeout", "10000")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
     props