You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Haoming Zhang <ha...@outlook.com> on 2014/11/24 06:29:13 UTC
Exception in thread "main" java.lang.ExceptionInInitializerError
Hi all,
Basically I used a lot of codes from this project https://github.com/stealthly/scala-kafka , my idea is to sent a key/value pair to Kafka, so that I can design a partition function in the further.
I checked the document and seems I should create a ProducerRecord, then I can specify a partition or key. Follows the codes from stealehly's project, I created a test main function as following:
import org.apache.kafka.clients.producer.{ KafkaProducer => NewKafkaProducer }
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.Properties
object Test extends App {
val testMessage = UUID.randomUUID().toString
val testTopic = "0e7fa3c2-1b75-407b-a03c-f40679ea3ce9"
val groupId_1 = UUID.randomUUID().toString
val brokerList: String = "localhost:9092"
val acks: Int = -1
val metadataFetchTimeout: Long = 3000L
val blockOnBufferFull: Boolean = true
val bufferSize: Long = 1024L * 1024L
val retries: Int = 0
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
val producer = new NewKafkaProducer(producerProps)
val record = new ProducerRecord(testTopic, 0, "key".getBytes, testMessage.getBytes)
producer.send(record)
val consumer = new KafkaConsumer(testTopic, groupId_1, "localhost:2181")
def exec(binaryObject: Array[Byte]) = {
val message = new String(binaryObject)
print("testMessage = " + testMessage + " and consumed message = " + message)
consumer.close()
}
print("KafkaSpec is waiting some seconds")
consumer.read(exec)
print("KafkaSpec consumed")
}
The KafkaConsumer class is exactly as the same as stealehly's project: https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala
But when I tried to run the test program, I got the following exception:
[2014-11-23 21:15:36,461] INFO ProducerConfig values:
block.on.buffer.full = true
retry.backoff.ms = 100
buffer.memory = 33554432
batch.size = 16384
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
receive.buffer.bytes = 32768
timeout.ms = 30000
max.in.flight.requests.per.connection = 5
metric.reporters = []
bootstrap.servers = [localhost:9092]
client.id =
compression.type = none
retries = 0
max.request.size = 1048576
send.buffer.bytes = 131072
acks = 1
reconnect.backoff.ms = 10
linger.ms = 0
metrics.num.samples = 2
metadata.fetch.timeout.ms = 60000
(org.apache.kafka.clients.producer.ProducerConfig)
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255)
at org.apache.kafka.clients.producer.KafkaProducer.<clinit>(KafkaProducer.java:64)
at Test$delayedInit$body.apply(Test.scala:47)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at Test$.main(Test.scala:21)
at Test.main(Test.scala)
Caused by: java.lang.NullPointerException
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:98)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:94)
at kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63)
at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97)
at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689)
at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:647)
at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:568)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:442)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:476)
at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:471)
at org.apache.log4j.LogManager.<clinit>(LogManager.java:125)
... 14 more
I think it should be an error came from ProducerConfig, I spent time on find a solution, but no luck. Does anyone could give me a hit/help please?
Regards,
Haoming
Re: Exception in thread "main" java.lang.ExceptionInInitializerError
Posted by Jun Rao <ju...@gmail.com>.
It seems you hit an exception when instantiating the sfl4j logger inside
the producer.
Thanks,
Jun
On Sun, Nov 23, 2014 at 9:29 PM, Haoming Zhang <ha...@outlook.com>
wrote:
>
>
>
> Hi all,
>
> Basically I used a lot of codes from this project
> https://github.com/stealthly/scala-kafka , my idea is to sent a key/value
> pair to Kafka, so that I can design a partition function in the further.
>
> I checked the document and seems I should create a ProducerRecord, then I
> can specify a partition or key. Follows the codes from stealehly's project,
> I created a test main function as following:
> import org.apache.kafka.clients.producer.{ KafkaProducer =>
> NewKafkaProducer }
> import org.apache.kafka.clients.producer.ProducerConfig
> import java.util.Properties
>
> object Test extends App {
> val testMessage = UUID.randomUUID().toString
> val testTopic = "0e7fa3c2-1b75-407b-a03c-f40679ea3ce9"
> val groupId_1 = UUID.randomUUID().toString
>
> val brokerList: String = "localhost:9092"
> val acks: Int = -1
> val metadataFetchTimeout: Long = 3000L
> val blockOnBufferFull: Boolean = true
> val bufferSize: Long = 1024L * 1024L
> val retries: Int = 0
>
> val producerProps = new Properties()
> producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
> producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
> producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG,
> metadataFetchTimeout.toString)
> producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG,
> blockOnBufferFull.toString)
> producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> bufferSize.toString)
> producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
> producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
> producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
>
> val producer = new NewKafkaProducer(producerProps)
>
> val record = new ProducerRecord(testTopic, 0, "key".getBytes,
> testMessage.getBytes)
> producer.send(record)
>
> val consumer = new KafkaConsumer(testTopic, groupId_1,
> "localhost:2181")
>
> def exec(binaryObject: Array[Byte]) = {
> val message = new String(binaryObject)
> print("testMessage = " + testMessage + " and consumed message = " +
> message)
> consumer.close()
> }
>
> print("KafkaSpec is waiting some seconds")
> consumer.read(exec)
> print("KafkaSpec consumed")
> }
>
> The KafkaConsumer class is exactly as the same as stealehly's project:
> https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala
>
> But when I tried to run the test program, I got the following exception:
> [2014-11-23 21:15:36,461] INFO ProducerConfig values:
> block.on.buffer.full = true
> retry.backoff.ms = 100
> buffer.memory = 33554432
> batch.size = 16384
> metrics.sample.window.ms = 30000
> metadata.max.age.ms = 300000
> receive.buffer.bytes = 32768
> timeout.ms = 30000
> max.in.flight.requests.per.connection = 5
> metric.reporters = []
> bootstrap.servers = [localhost:9092]
> client.id =
> compression.type = none
> retries = 0
> max.request.size = 1048576
> send.buffer.bytes = 131072
> acks = 1
> reconnect.backoff.ms = 10
> linger.ms = 0
> metrics.num.samples = 2
> metadata.fetch.timeout.ms = 60000
> (org.apache.kafka.clients.producer.ProducerConfig)
> Exception in thread "main" java.lang.ExceptionInInitializerError
> at
> org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255)
> at
> org.apache.kafka.clients.producer.KafkaProducer.<clinit>(KafkaProducer.java:64)
> at Test$delayedInit$body.apply(Test.scala:47)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
> at
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:71)
> at scala.App$$anonfun$main$1.apply(App.scala:71)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
> at scala.App$class.main(App.scala:71)
> at Test$.main(Test.scala:21)
> at Test.main(Test.scala)
> Caused by: java.lang.NullPointerException
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:98)
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:94)
> at
> kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63)
> at
> org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257)
> at
> org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133)
> at
> org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97)
> at
> org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689)
> at
> org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:647)
> at
> org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:568)
> at
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:442)
> at
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:476)
> at
> org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:471)
> at org.apache.log4j.LogManager.<clinit>(LogManager.java:125)
> ... 14 more
>
> I think it should be an error came from ProducerConfig, I spent time on
> find a solution, but no luck. Does anyone could give me a hit/help please?
>
> Regards,
> Haoming
>
>