You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/04/08 19:15:14 UTC

git commit: KAFKA-1366 Multiple Unit Test failures with new producer; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk a840c73c3 -> 8f94bc331


KAFKA-1366 Multiple Unit Test failures with new producer; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f94bc33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f94bc33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f94bc33

Branch: refs/heads/trunk
Commit: 8f94bc3315e7d469489da55f94e08a51235c01f3
Parents: a840c73
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Tue Apr 8 10:15:01 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Apr 8 10:15:08 2014 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/config/ConfigDef.java   |  5 +-
 .../kafka/producer/KafkaLog4jAppender.scala     | 33 +++++++-----
 .../kafka/log4j/KafkaLog4jAppenderTest.scala    | 54 ++++----------------
 3 files changed, 33 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8f94bc33/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 9ba7ee7..addc906 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -182,7 +182,10 @@ public class ConfigDef {
                     if (value instanceof List)
                         return (List<?>) value;
                     else if (value instanceof String)
-                        return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
+                        if (trimmed.isEmpty())
+                            return Collections.emptyList();
+                        else
+                            return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
                     else
                         throw new ConfigException(name, value, "Expected a comma separated list.");
                 case CLASS:

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f94bc33/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index 0067a53..6105726 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -26,36 +26,40 @@ import java.util.{Properties, Date}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 
 class KafkaLog4jAppender extends AppenderSkeleton with Logging {
-  var topic:String = null
-  var brokerList:String = null
-  var compressionCodec:String = null
+  var topic: String = null
+  var brokerList: String = null
+  var compressionType: String = null
   var requiredNumAcks: Int = Int.MaxValue
+  var syncSend: Boolean = false
 
   private var producer: KafkaProducer = null
 
-  def getTopic:String = topic
+  def getTopic: String = topic
   def setTopic(topic: String) { this.topic = topic }
 
-  def getBrokerList:String = brokerList
+  def getBrokerList: String = brokerList
   def setBrokerList(brokerList: String) { this.brokerList = brokerList }
 
-  def getCompressionCodec:String = compressionCodec
-  def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec }
+  def getCompressionType: String = compressionType
+  def setCompressionType(compressionType: String) { this.compressionType = compressionType }
 
-  def getRequiredNumAcks:Int = requiredNumAcks
-  def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks }
+  def getRequiredNumAcks: Int = requiredNumAcks
+  def setRequiredNumAcks(requiredNumAcks: Int) { this.requiredNumAcks = requiredNumAcks }
+
+  def getSyncSend: Boolean = syncSend
+  def setSyncSend(syncSend: Boolean) { this.syncSend = syncSend }
 
   override def activateOptions() {
     // check for config parameter validity
     val props = new Properties()
     if(brokerList != null)
-      props.put("metadata.broker.list", brokerList)
+      props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     if(props.isEmpty)
-      throw new MissingConfigException("The metadata.broker.list property should be specified")
+      throw new MissingConfigException("The bootstrap servers property should be specified")
     if(topic == null)
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
-    if(compressionCodec != null) props.put("compression.codec", compressionCodec)
-    if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString)
+    if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
+    if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString)
     producer = new KafkaProducer(props)
     LogLog.debug("Kafka producer connected to " +  brokerList)
     LogLog.debug("Logging for topic: " + topic)
@@ -64,7 +68,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   override def append(event: LoggingEvent)  {
     val message = subAppend(event)
     LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
-    producer.send(new ProducerRecord(topic, message.getBytes()));
+    val response = producer.send(new ProducerRecord(topic, message.getBytes()))
+    if (syncSend) response.get
   }
 
   def subAppend(event: LoggingEvent): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f94bc33/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index 67497dd..bbfb01e 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -55,7 +55,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
     val logDirZkPath = propsZk.getProperty("log.dir")
     logDirZk = new File(logDirZkPath)
     config = new KafkaConfig(propsZk)
-    server = TestUtils.createServer(config);
+    server = TestUtils.createServer(config)
     simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "")
   }
 
@@ -69,16 +69,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
 
   @Test
   def testKafkaLog4jConfigs() {
+    // host missing
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
     props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
     props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
-    // port missing
     try {
       PropertyConfigurator.configure(props)
       fail("Missing properties exception was expected !")
@@ -86,55 +85,21 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
       case e: MissingConfigException =>
     }
 
+    // topic missing
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
     props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
     props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
-    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
-
-    // host missing
-    try {
-      PropertyConfigurator.configure(props)
-      fail("Missing properties exception was expected !")
-    }catch {
-      case e: MissingConfigException =>
-    }
-
-    props = new Properties()
-    props.put("log4j.rootLogger", "INFO")
-    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
     props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
-    // topic missing
     try {
       PropertyConfigurator.configure(props)
       fail("Missing properties exception was expected !")
     }catch {
       case e: MissingConfigException =>
     }
-
-    props = new Properties()
-    props.put("log4j.rootLogger", "INFO")
-    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
-    props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
-
-    // serializer missing
-    try {
-      PropertyConfigurator.configure(props)
-    }catch {
-      case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder")
-    }
   }
 
   @Test
@@ -156,15 +121,16 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
   }
 
   private def getLog4jConfig: Properties = {
-    var props = new Properties()
+    val props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
+    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
-    props.put("log4j.appender.KAFKA.requiredNumAcks", "1")
+    props.put("log4j.appender.KAFKA.RequiredNumAcks", "1")
+    props.put("log4j.appender.KAFKA.SyncSend", "true")
+    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
     props
   }
 }