You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/04/08 19:15:14 UTC
git commit: KAFKA-1366 Multiple Unit Test failures with new producer;
reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk a840c73c3 -> 8f94bc331
KAFKA-1366 Multiple Unit Test failures with new producer; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f94bc33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f94bc33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f94bc33
Branch: refs/heads/trunk
Commit: 8f94bc3315e7d469489da55f94e08a51235c01f3
Parents: a840c73
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Tue Apr 8 10:15:01 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Apr 8 10:15:08 2014 -0700
----------------------------------------------------------------------
.../apache/kafka/common/config/ConfigDef.java | 5 +-
.../kafka/producer/KafkaLog4jAppender.scala | 33 +++++++-----
.../kafka/log4j/KafkaLog4jAppenderTest.scala | 54 ++++----------------
3 files changed, 33 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8f94bc33/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 9ba7ee7..addc906 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -182,7 +182,10 @@ public class ConfigDef {
if (value instanceof List)
return (List<?>) value;
else if (value instanceof String)
- return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
+ if (trimmed.isEmpty())
+ return Collections.emptyList();
+ else
+ return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
else
throw new ConfigException(name, value, "Expected a comma separated list.");
case CLASS:
http://git-wip-us.apache.org/repos/asf/kafka/blob/8f94bc33/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index 0067a53..6105726 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -26,36 +26,40 @@ import java.util.{Properties, Date}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaLog4jAppender extends AppenderSkeleton with Logging {
- var topic:String = null
- var brokerList:String = null
- var compressionCodec:String = null
+ var topic: String = null
+ var brokerList: String = null
+ var compressionType: String = null
var requiredNumAcks: Int = Int.MaxValue
+ var syncSend: Boolean = false
private var producer: KafkaProducer = null
- def getTopic:String = topic
+ def getTopic: String = topic
def setTopic(topic: String) { this.topic = topic }
- def getBrokerList:String = brokerList
+ def getBrokerList: String = brokerList
def setBrokerList(brokerList: String) { this.brokerList = brokerList }
- def getCompressionCodec:String = compressionCodec
- def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec }
+ def getCompressionType: String = compressionType
+ def setCompressionType(compressionType: String) { this.compressionType = compressionType }
- def getRequiredNumAcks:Int = requiredNumAcks
- def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks }
+ def getRequiredNumAcks: Int = requiredNumAcks
+ def setRequiredNumAcks(requiredNumAcks: Int) { this.requiredNumAcks = requiredNumAcks }
+
+ def getSyncSend: Boolean = syncSend
+ def setSyncSend(syncSend: Boolean) { this.syncSend = syncSend }
override def activateOptions() {
// check for config parameter validity
val props = new Properties()
if(brokerList != null)
- props.put("metadata.broker.list", brokerList)
+ props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
if(props.isEmpty)
- throw new MissingConfigException("The metadata.broker.list property should be specified")
+ throw new MissingConfigException("The bootstrap servers property should be specified")
if(topic == null)
throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
- if(compressionCodec != null) props.put("compression.codec", compressionCodec)
- if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString)
+ if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
+ if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString)
producer = new KafkaProducer(props)
LogLog.debug("Kafka producer connected to " + brokerList)
LogLog.debug("Logging for topic: " + topic)
@@ -64,7 +68,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
override def append(event: LoggingEvent) {
val message = subAppend(event)
LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
- producer.send(new ProducerRecord(topic, message.getBytes()));
+ val response = producer.send(new ProducerRecord(topic, message.getBytes()))
+ if (syncSend) response.get
}
def subAppend(event: LoggingEvent): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8f94bc33/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index 67497dd..bbfb01e 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -55,7 +55,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
val logDirZkPath = propsZk.getProperty("log.dir")
logDirZk = new File(logDirZkPath)
config = new KafkaConfig(propsZk)
- server = TestUtils.createServer(config);
+ server = TestUtils.createServer(config)
simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "")
}
@@ -69,16 +69,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
@Test
def testKafkaLog4jConfigs() {
+ // host missing
var props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
- props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
- // port missing
try {
PropertyConfigurator.configure(props)
fail("Missing properties exception was expected !")
@@ -86,55 +85,21 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
case e: MissingConfigException =>
}
+ // topic missing
props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
- props.put("log4j.appender.KAFKA.Topic", "test-topic")
- props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
- props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
-
- // host missing
- try {
- PropertyConfigurator.configure(props)
- fail("Missing properties exception was expected !")
- }catch {
- case e: MissingConfigException =>
- }
-
- props = new Properties()
- props.put("log4j.rootLogger", "INFO")
- props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
- props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
- props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
- props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
- // topic missing
try {
PropertyConfigurator.configure(props)
fail("Missing properties exception was expected !")
}catch {
case e: MissingConfigException =>
}
-
- props = new Properties()
- props.put("log4j.rootLogger", "INFO")
- props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
- props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
- props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
- props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
- props.put("log4j.appender.KAFKA.Topic", "test-topic")
- props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
-
- // serializer missing
- try {
- PropertyConfigurator.configure(props)
- }catch {
- case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder")
- }
}
@Test
@@ -156,15 +121,16 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
}
private def getLog4jConfig: Properties = {
- var props = new Properties()
+ val props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
- props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
- props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
- props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
+ props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
+ props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
+ props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.appender.KAFKA.Topic", "test-topic")
- props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
- props.put("log4j.appender.KAFKA.requiredNumAcks", "1")
+ props.put("log4j.appender.KAFKA.RequiredNumAcks", "1")
+ props.put("log4j.appender.KAFKA.SyncSend", "true")
+ props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
props
}
}