You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Rajaram, Lokesh" <LO...@intuit.com> on 2016/02/08 21:22:38 UTC
Mirror maker stops working (0.8.2.1)
Hello,
We have been seeing consistent issue mirroring between our DataCenters.
Below is our setup
------------------------
Source: AWS (13 Brokers)
Destination: OTHER-DC (20 Brokers)
Topic: mirrortest (source: 260 partitions, destination: 200 partitions)
Connectivity: AWS Direct Connect (max 6Gbps)
Data details: Source is receiving 40,000 msg/sec, each message is around 5KB
Mirroring
------------
Node: is at our OTHER-DC (24 Cores, 48 GB Memory)
JVM Details: 1.8.0_51, -Xmx8G -Xms8G -XX:PermSize=96m -XX:MaxPermSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
Launch script: kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --num.producers 1 --whitelist mirrortest --num.streams 1 --queue.size 100000
consumer.properties
---------------------------
zookeeper.connect=<host:port>
group.id=KafkaMirror
auto.offset.reset=smallest
fetch.message.max.bytes=9000000
zookeeper.connection.timeout.ms=60000
rebalance.max.retries=4
rebalance.backoff.ms=5000
producer.properties
--------------------------
metadata.broker.list=<host:port>
partitioner.class=<our custom round robin partitioner>
producer.type=async
Observations
-----------------
when we start the mirroring job everything works fine as expected, eventually we hit an issue where the job stops consuming no more.
At this stage:
1. No Error seen in the mirrormaker logs
2. consumer threads are not fetching any messages and we see thread dumps as follows:
"ConsumerFetcherThread-KafkaMirror_1454375262613-7b760d87-0-26" - Thread t@73
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <79b6d3ce> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Locked ownable synchronizers:
- locked <199dc92d> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
3. Producer stops producing, in trace mode we notice it's handling 0 events and Thread dump as follows:
"ProducerSendThread--0" - Thread t@53
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
- locked <5ae2fc40> (a java.lang.Object)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
- locked <8489cd8> (a java.lang.Object)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Locked ownable synchronizers:
- None
4. In VisualVM MBeans
kafka.producer.async > ProducerSendThread > ProducerQueueSize: is at full capacity(100000) all the time
5. No GC activity after this happens
6. On the Mirrormaker Node we always see the TX traffic drops to zero first and RX drops shortly after that
Please let us know if you need more info.
Thanks,
Lokesh