You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Haoming Zhang <ha...@outlook.com> on 2014/11/24 06:29:13 UTC

Exception in thread "main" java.lang.ExceptionInInitializerError



Hi all,

Basically I used a lot of codes from this project https://github.com/stealthly/scala-kafka , my idea is to sent a key/value pair to Kafka, so that I can design a partition function in the further.

I checked the document and seems I should create a ProducerRecord, then I can specify a partition or key. Follows the codes from stealehly's project, I created a test main function as following:
import org.apache.kafka.clients.producer.{ KafkaProducer => NewKafkaProducer }
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.Properties

object Test extends App {
    val testMessage = UUID.randomUUID().toString
    val testTopic = "0e7fa3c2-1b75-407b-a03c-f40679ea3ce9"
    val groupId_1 = UUID.randomUUID().toString
  
    val brokerList: String = "localhost:9092"
    val acks: Int = -1
    val metadataFetchTimeout: Long = 3000L
    val blockOnBufferFull: Boolean = true
    val bufferSize: Long = 1024L * 1024L
    val retries: Int = 0

    val producerProps = new Properties()
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
    producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
    producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
    producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
    producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
    producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")

    val producer = new NewKafkaProducer(producerProps)
    
    val record = new ProducerRecord(testTopic, 0, "key".getBytes, testMessage.getBytes)
    producer.send(record)

    val consumer = new KafkaConsumer(testTopic, groupId_1, "localhost:2181")
  
    def exec(binaryObject: Array[Byte]) = {
      val message = new String(binaryObject)
      print("testMessage = " + testMessage + " and consumed message = " + message)
      consumer.close()
    }
  
    print("KafkaSpec is waiting some seconds")
    consumer.read(exec)
    print("KafkaSpec consumed")
}

The KafkaConsumer class is exactly as the same as stealehly's project: https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala

But when I tried to run the test program, I got the following exception:
[2014-11-23 21:15:36,461] INFO ProducerConfig values: 
    block.on.buffer.full = true
    retry.backoff.ms = 100
    buffer.memory = 33554432
    batch.size = 16384
    metrics.sample.window.ms = 30000
    metadata.max.age.ms = 300000
    receive.buffer.bytes = 32768
    timeout.ms = 30000
    max.in.flight.requests.per.connection = 5
    metric.reporters = []
    bootstrap.servers = [localhost:9092]
    client.id = 
    compression.type = none
    retries = 0
    max.request.size = 1048576
    send.buffer.bytes = 131072
    acks = 1
    reconnect.backoff.ms = 10
    linger.ms = 0
    metrics.num.samples = 2
    metadata.fetch.timeout.ms = 60000
 (org.apache.kafka.clients.producer.ProducerConfig)
Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255)
    at org.apache.kafka.clients.producer.KafkaProducer.<clinit>(KafkaProducer.java:64)
    at Test$delayedInit$body.apply(Test.scala:47)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(Test.scala:21)
    at Test.main(Test.scala)
Caused by: java.lang.NullPointerException
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:98)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:94)
    at kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63)
    at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257)
    at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133)
    at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97)
    at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689)
    at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:647)
    at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:568)
    at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:442)
    at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:476)
    at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:471)
    at org.apache.log4j.LogManager.<clinit>(LogManager.java:125)
    ... 14 more

I think it should be an error came from ProducerConfig, I spent time on find a solution, but no luck. Does anyone could give me a hit/help please?

Regards,
Haoming

 		 	   		  

Re: Exception in thread "main" java.lang.ExceptionInInitializerError

Posted by Jun Rao <ju...@gmail.com>.
It seems you hit an exception when instantiating the sfl4j logger inside
the producer.

Thanks,

Jun

On Sun, Nov 23, 2014 at 9:29 PM, Haoming Zhang <ha...@outlook.com>
wrote:

>
>
>
> Hi all,
>
> Basically I used a lot of codes from this project
> https://github.com/stealthly/scala-kafka , my idea is to sent a key/value
> pair to Kafka, so that I can design a partition function in the further.
>
> I checked the document and seems I should create a ProducerRecord, then I
> can specify a partition or key. Follows the codes from stealehly's project,
> I created a test main function as following:
> import org.apache.kafka.clients.producer.{ KafkaProducer =>
> NewKafkaProducer }
> import org.apache.kafka.clients.producer.ProducerConfig
> import java.util.Properties
>
> object Test extends App {
>     val testMessage = UUID.randomUUID().toString
>     val testTopic = "0e7fa3c2-1b75-407b-a03c-f40679ea3ce9"
>     val groupId_1 = UUID.randomUUID().toString
>
>     val brokerList: String = "localhost:9092"
>     val acks: Int = -1
>     val metadataFetchTimeout: Long = 3000L
>     val blockOnBufferFull: Boolean = true
>     val bufferSize: Long = 1024L * 1024L
>     val retries: Int = 0
>
>     val producerProps = new Properties()
>     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
>     producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
>     producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG,
> metadataFetchTimeout.toString)
>     producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG,
> blockOnBufferFull.toString)
>     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> bufferSize.toString)
>     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
>     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
>     producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
>
>     val producer = new NewKafkaProducer(producerProps)
>
>     val record = new ProducerRecord(testTopic, 0, "key".getBytes,
> testMessage.getBytes)
>     producer.send(record)
>
>     val consumer = new KafkaConsumer(testTopic, groupId_1,
> "localhost:2181")
>
>     def exec(binaryObject: Array[Byte]) = {
>       val message = new String(binaryObject)
>       print("testMessage = " + testMessage + " and consumed message = " +
> message)
>       consumer.close()
>     }
>
>     print("KafkaSpec is waiting some seconds")
>     consumer.read(exec)
>     print("KafkaSpec consumed")
> }
>
> The KafkaConsumer class is exactly as the same as stealehly's project:
> https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala
>
> But when I tried to run the test program, I got the following exception:
> [2014-11-23 21:15:36,461] INFO ProducerConfig values:
>     block.on.buffer.full = true
>     retry.backoff.ms = 100
>     buffer.memory = 33554432
>     batch.size = 16384
>     metrics.sample.window.ms = 30000
>     metadata.max.age.ms = 300000
>     receive.buffer.bytes = 32768
>     timeout.ms = 30000
>     max.in.flight.requests.per.connection = 5
>     metric.reporters = []
>     bootstrap.servers = [localhost:9092]
>     client.id =
>     compression.type = none
>     retries = 0
>     max.request.size = 1048576
>     send.buffer.bytes = 131072
>     acks = 1
>     reconnect.backoff.ms = 10
>     linger.ms = 0
>     metrics.num.samples = 2
>     metadata.fetch.timeout.ms = 60000
>  (org.apache.kafka.clients.producer.ProducerConfig)
> Exception in thread "main" java.lang.ExceptionInInitializerError
>     at
> org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
>     at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243)
>     at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255)
>     at
> org.apache.kafka.clients.producer.KafkaProducer.<clinit>(KafkaProducer.java:64)
>     at Test$delayedInit$body.apply(Test.scala:47)
>     at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
>     at
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>     at scala.App$$anonfun$main$1.apply(App.scala:71)
>     at scala.App$$anonfun$main$1.apply(App.scala:71)
>     at scala.collection.immutable.List.foreach(List.scala:318)
>     at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>     at scala.App$class.main(App.scala:71)
>     at Test$.main(Test.scala:21)
>     at Test.main(Test.scala)
> Caused by: java.lang.NullPointerException
>     at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:98)
>     at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:94)
>     at
> kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63)
>     at
> org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257)
>     at
> org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133)
>     at
> org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97)
>     at
> org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689)
>     at
> org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:647)
>     at
> org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:568)
>     at
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:442)
>     at
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:476)
>     at
> org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:471)
>     at org.apache.log4j.LogManager.<clinit>(LogManager.java:125)
>     ... 14 more
>
> I think it should be an error came from ProducerConfig, I spent time on
> find a solution, but no luck. Does anyone could give me a hit/help please?
>
> Regards,
> Haoming
>
>