You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Elizabeth Bennett <eb...@loggly.com> on 2015/02/18 23:41:52 UTC

Re: Thread safety of Encoder implementations

Hi Guozhang,
Sorry for the delayed response. The code we use for the producer send call
looks like this:

We instantiate the producer like this:

  Producer<EventType, EventType> producer = new Producer<EventType,
EventType>(config, context.getEventSerializer(), new
ProducerHandlerWrapper(config, context.getCallback()), null,
context.getPartitioner());

The ProducerHandlerWrapper just wraps the DefaultEventHandler with a
callback in case of exceptions. The context.getEventSerializer returns an
object which is an implementation of the Encoder class.

The kafka config object we pass has these parameters:

    p.setProperty("serializer.class", eventSerializer.getClass().getName());
    p.setProperty("broker.list", kafkaBrokerList.toString());
    // Kafka default is 10K
    p.setProperty("queue.size", "50000");
    p.setProperty("queue.enqueue.timeout.ms", "-1");
    p.setProperty("max.message.size",
Property.KafkaProducerMaxMessageSizeBytes.value());
    p.setProperty("buffer.size",
Property.KafkaProducerSocketBufferSizeBytes.value());
    p.setProperty("connect.timeout.ms",
Property.KafkaProducerConnectTimeoutMillis.value());
    p.setProperty("socket.timeout.ms",

String.valueOf(TimeValue.parseMillisStrict(Property.KafkaProducerSocketTimeout.value())));

So we actually specify the "serializer.class" property in the KafkaConfig
object as well as pass an Encoder in the constructor of the Producer.

When we are actually producing an event, we do it like this:

producer.send(convertToScalaProducerDataList(topic, event));

this is the convertToScalaProducerDataList method:

  // Java to Scala conversions
  private scala.collection.Seq<kafka.producer.ProducerData<EventType,
EventType>> convertToScalaProducerDataList(String topic, EventType e) {
    ArrayList<ProducerData<EventType, EventType>> list = new ArrayList<>(1);
    list.add(new ProducerData<EventType, EventType>(topic, e));
    return JavaConversions.asScalaBuffer(list);
  }

We are planning on upgrading to Kafka 0.8 sometime this year, so I'm to
glad to know that that will solve this issue. In the meantime, let me know
if you think the calling pattern we are using will cause some concurrent
access of the Encoder class. Thank you!

Cheers,
Liz B.

On Thu, Jan 15, 2015 at 3:42 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Liz,
>
> Could you paste your code for calling the producer send call here? Just
> realized in 0.7 there might be some calling pattern corner cases that cause
> concurrent access of the serializer.
>
> Also, I would recommend you to try out the new version of Kafka (0.8.x), in
> which each producer will only have one back ground thread for sending data,
> guaranteeing thread safety.
>
> Guozhang
>
> On Tue, Jan 13, 2015 at 11:40 AM, Elizabeth Bennett <eb...@loggly.com>
> wrote:
>
> > Hi Guozhang,
> > Thanks for you response. We've only got one producer client (per Kryo
> > instance) but the producer client is configured (via the broker.list
> > config) to produce to two Kafka brokers. When we create the Producer, we
> > pass in an instance of the serializer. What if we used the
> serializer.class
> > config to specify the class name of the serializer rather than pass in an
> > instance? Would Kafka then create a separate serializer instance for each
> > broker that it produces to? That would solve our problem assuming that
> the
> > Producer spawns new threads for each kafka broker that it produces to,
> > which I'm not sure about.
> >
> > --Liz
> >
> > On Mon, Jan 12, 2015 at 10:55 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Hi Liz,
> > >
> > > Do you have multiple producer clients that use the same Kryo serializer
> > > objects? Each client will only have one background thread that tries to
> > > call serialize(), and hence in that case you will have concurrent
> access.
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Jan 12, 2015 at 5:32 PM, Elizabeth Bennett <
> ebennett@loggly.com>
> > > wrote:
> > >
> > > > Hi Kafka Users,
> > > > I have written my own implementation of the kafka Encoder class for
> > > > serializing objects to Messages. It uses Kryo, which is a non-thread
> > safe
> > > > java serialization library. I'm using Kafka 0.7.2.
> > > >
> > > > We recently ran into an issue where we increased the number of kafka
> > > > brokers for our kafka producer from 1 to 2. When we did this, we ran
> > into
> > > > exceptions that seemed related to Kryo being used concurrently by
> > > multiple
> > > > threads. So, my question is, do I need to modify my Encoder class to
> be
> > > > thread safe? I dug through the Kafka documentation and couldn't find
> > > > anything that said one way or another. Any information would be
> great.
> > > > Thank you!
> > > >
> > > > --Liz Bennett
> > > >
> > > > p.s. for what it's worth here is a stack trace from one of the
> > exceptions
> > > > we saw:
> > > >
> > > > 2015-01-08 07:33:35,938 [ERROR] [ProducerHandlerWrapper.handle]
> Failed
> > > > to write 9 batched events to Kafka.
> > > > com.esotericsoftware.kryo.KryoException:
> > > > java.lang.ArrayIndexOutOfBoundsException: 40
> > > > Serialization trace:
> > > > fieldGroups (com.loggly.core.event.Event)
> > > > event (com.loggly.core.event.FailedEvent)
> > > >         at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> > > >         at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474)
> > > >         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538)
> > > >         at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > > >         at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474)
> > > >         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:520)
> > > >         at
> > > >
> > >
> >
> com.loggly.eventreader.kryo.KryoEventSerDes.serialize(KryoEventSerDes.java:39)
> > > >         at
> > > >
> > >
> >
> com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:23)
> > > >         at
> > > >
> > >
> >
> com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:8)
> > > >         at
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74)
> > > >         at
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74)
> > > >         at
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
> > > >         at
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
> > > >         at
> > > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> > > >         at scala.collection.immutable.List.foreach(List.scala:45)
> > > >         at
> > > >
> > >
> >
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:30)
> > > >         at
> > > scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42)
> > > >         at
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
> > > >         at
> scala.collection.mutable.ListBuffer.map(ListBuffer.scala:42)
> > > >         at
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74)
> > > >         at
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74)
> > > >         at
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
> > > >         at
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
> > > >         at
> > > >
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
> > > >         at
> > > >
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
> > > >         at
> scala.collection.Iterator$class.foreach(Iterator.scala:660)
> > > >         at
> > > >
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> > > >         at
> > > >
> > >
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> > > >         at
> > > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43)
> > > >         at scala.collection.mutable.HashMap.foreach(HashMap.scala:93)
> > > >         at
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
> > > >         at scala.collection.mutable.HashMap.map(HashMap.scala:43)
> > > >         at
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:74)
> > > >         at
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> > > >         at
> > > >
> > >
> >
> com.loggly.kafka.producer.ProducerHandlerWrapper.handle(ProducerHandlerWrapper.java:64)
> > > >         at
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> > > >         at
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> > > >         at
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> > > >         at
> scala.collection.immutable.Stream.foreach(Stream.scala:291)
> > > >         at
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> > > >         at
> > > >
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> > > > Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
> > > >         at
> > > > com.esotericsoftware.kryo.util.ObjectMap.resize(ObjectMap.java:460)
> > > >         at
> > > >
> > com.esotericsoftware.kryo.util.ObjectMap.put_internal(ObjectMap.java:125)
> > > >         at
> > > com.esotericsoftware.kryo.util.ObjectMap.put(ObjectMap.java:73)
> > > >         at
> > > >
> > >
> >
> com.esotericsoftware.kryo.util.DefaultClassResolver.register(DefaultClassResolver.java:49)
> > > >         at
> > > >
> > >
> >
> com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
> > > >         at
> > com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:476)
> > > >         at
> > > >
> > >
> >
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> > > >         at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:503)
> > > >         at
> > > > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:608)
> > > >         at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:91)
> > > >         at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
> > > >         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538)
> > > >         at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > > >         ... 40 more
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>