You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Rajaram, Lokesh" <LO...@intuit.com> on 2016/02/08 21:22:38 UTC

Mirror maker stops working (0.8.2.1)

Hello,

We have been seeing consistent issue mirroring between our DataCenters.

Below is our setup
------------------------

Source: AWS (13 Brokers)
Destination: OTHER-DC (20 Brokers)
Topic: mirrortest (source: 260 partitions, destination: 200 partitions)
Connectivity: AWS Direct Connect (max 6Gbps)
Data details: Source is receiving 40,000 msg/sec, each message is around 5KB

Mirroring
------------

Node: is at our OTHER-DC (24 Cores, 48 GB Memory)
JVM Details: 1.8.0_51, -Xmx8G -Xms8G -XX:PermSize=96m -XX:MaxPermSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
Launch script: kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --num.producers 1 --whitelist mirrortest --num.streams 1 --queue.size 100000

consumer.properties
---------------------------
zookeeper.connect=<host:port>
group.id=KafkaMirror
auto.offset.reset=smallest
fetch.message.max.bytes=9000000
zookeeper.connection.timeout.ms=60000
rebalance.max.retries=4
rebalance.backoff.ms=5000

producer.properties
--------------------------
metadata.broker.list=<host:port>
partitioner.class=<our custom round robin partitioner>
producer.type=async


Observations
-----------------
when we start the mirroring job everything works fine as expected, eventually we hit an issue where the job stops consuming no more.

At this stage:

1. No Error seen in the mirrormaker logs

2. consumer threads are not fetching any messages and we see thread dumps as follows:

"ConsumerFetcherThread-KafkaMirror_1454375262613-7b760d87-0-26" - Thread t@73
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <79b6d3ce> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Locked ownable synchronizers:
- locked <199dc92d> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)

3. Producer stops producing, in trace mode we notice it's handling 0 events and Thread dump as follows:

"ProducerSendThread--0" - Thread t@53
  java.lang.Thread.State: RUNNABLE
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
- locked <5ae2fc40> (a java.lang.Object)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
- locked <8489cd8> (a java.lang.Object)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

  Locked ownable synchronizers:
- None

4. In VisualVM MBeans
  kafka.producer.async > ProducerSendThread > ProducerQueueSize: is at full capacity(100000) all the time

5. No GC activity after this happens

6. On the Mirrormaker Node we always see the TX traffic drops to zero first and RX drops shortly after that

Please let us know if you need more info.

Thanks,
Lokesh