You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/02/14 19:00:15 UTC
git commit: Kafka Appender causes Log4j Deadlock;
patched by David Arthur; reviewed by Jun Rao; kafka-524
Updated Branches:
refs/heads/0.7 a475d7c43 -> 4d7629dd7
Kafka Appender causes Log4j Deadlock; patched by David Arthur; reviewed by Jun Rao; kafka-524
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4d7629dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4d7629dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4d7629dd
Branch: refs/heads/0.7
Commit: 4d7629dd7f416f94bd148b815518eaa60531e299
Parents: a475d7c
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Feb 14 09:59:55 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Feb 14 09:59:55 2013 -0800
----------------------------------------------------------------------
.../scala/kafka/producer/KafkaLog4jAppender.scala | 39 ++++++++++++---
.../unit/kafka/log4j/KafkaLog4jAppenderTest.scala | 1 +
2 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d7629dd/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index 417da27..20c49d4 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -31,15 +31,25 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
var topic:String = null
var serializerClass:String = null
var zkConnect:String = null
+ var zkConnectTimeout:String = null
+ var zkSessionTimeout:String = null
var brokerList:String = null
private var producer: Producer[String, String] = null
+ private var config : ProducerConfig = null
+ @volatile private var initialized:Boolean = false
def getTopic:String = topic
def setTopic(topic: String) { this.topic = topic }
def getZkConnect:String = zkConnect
def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect }
+
+ def getZkConnectTimeout:String = zkConnectTimeout
+ def setZkConnectTimeout(zkConnectTimeout: String) { this.zkConnectTimeout = zkConnectTimeout }
+
+ def getZkSessionTimeout:String = zkSessionTimeout
+ def setZkSessionTimeout(zkSessionTimeout: String) { this.zkSessionTimeout = zkSessionTimeout }
def getBrokerList:String = brokerList
def setBrokerList(brokerList: String) { this.brokerList = brokerList }
@@ -51,8 +61,17 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer();
// check for config parameter validity
val props = new Properties()
- if( zkConnect == null) connectDiagnostic += "zkConnect"
- else props.put("zk.connect", zkConnect);
+
+ if(zkConnect == null)
+ connectDiagnostic += "zkConnect"
+ else {
+ props.put("zk.connect", zkConnect)
+ if(zkConnectTimeout != null)
+ props.put("zk.connectiontimeout.ms", zkConnectTimeout)
+ if(zkSessionTimeout != null)
+ props.put("zk.sessiontimeout.ms", zkSessionTimeout)
+ }
+
if( brokerList == null) connectDiagnostic += "brokerList"
else if( props.isEmpty) props.put("broker.list", brokerList)
if(props.isEmpty )
@@ -66,13 +85,18 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
LogLog.warn("Using default encoder - kafka.serializer.StringEncoder")
}
props.put("serializer.class", serializerClass)
- val config : ProducerConfig = new ProducerConfig(props)
- producer = new Producer[String, String](config)
- LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
- LogLog.debug("Logging for topic: " + topic)
+ config = new ProducerConfig(props)
}
override def append(event: LoggingEvent) {
+ // AppenderSkeleton#append serialized via AppenderSkeleton#doAppend
+ // so it is safe to do this
+ if(!initialized) {
+ producer = new Producer[String, String](config)
+ LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
+ LogLog.debug("Logging for topic: " + topic)
+ initialized = true
+ }
val message : String = if( this.layout == null) {
event.getRenderedMessage
}
@@ -86,7 +110,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
override def close() {
if(!this.closed) {
this.closed = true
- producer.close()
+ if(initialized)
+ producer.close()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d7629dd/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index 7f67eb3..1e93a9f 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -220,6 +220,7 @@ class KafkaLog4jAppenderTest extends JUnitSuite with Logging {
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
+ props.put("log4j.appender.KAFKA.ZkConnectTimeout", "10000")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
props