You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Bill Zhang (JIRA)" <ji...@apache.org> on 2016/11/03 07:12:59 UTC
[jira] [Commented] (KAFKA-955) After a leader change, messages sent
with ack=0 are lost
[ https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15631889#comment-15631889 ]
Bill Zhang commented on KAFKA-955:
----------------------------------
I am using Flume with Kafka Channel & facing below issues.
Kafka Version: kafka_2.9.1-0.8.2.0
Flume Version: apache-flume-1.6.0
It seems was resolved from below :
Step 1: copy zookeeper Jar file to Flume classpath
Step 2: a1.channels.c1.kafka.producer.type = async
Note:
i didn't change default value of request.required.acks. It seems works, it is still in testing...
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Issue 1:
02 Nov 2016 22:20:06,201 WARN [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2] (kafka.utils.Logging$class.warn:83) - Reconnect due to socket error: null
02 Nov 2016 22:20:06,203 INFO [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2], Stopped
02 Nov 2016 22:20:06,203 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2], Shutdown completed
02 Nov 2016 22:20:06,203 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], Shutting down
02 Nov 2016 22:20:06,204 WARN [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1] (kafka.utils.Logging$class.warn:83) - Reconnect due to socket error: null
02 Nov 2016 22:20:06,204 INFO [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], Stopped
02 Nov 2016 22:20:06,204 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], Shutdown completed
02 Nov 2016 22:20:06,205 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478087994042] All connections stopped
02 Nov 2016 22:20:06,207 INFO [ZkClient-EventThread-58-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] (org.I0Itec.zkclient.ZkEventThread.run:82) - Terminate ZkClient event thread.
02 Nov 2016 22:20:06,212 WARN [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.warn:89) - Failed to send producer request with correlation id 34198503 to broker 1 with data for partitions [channel-tbox-parsed-topic,3]
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:511)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:42)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,214 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Back off for 100 ms before retrying send. Remaining retries = 3
02 Nov 2016 22:20:06,214 WARN [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:363) - Sending events to Kafka failed
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:76)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:42)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,215 ERROR [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.source.kafka.KafkaSource.process:153) - KafkaSource EXCEPTION, {}
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.kafka.KafkaChannel{name: c1}
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Commit failed as send to Kafka failed
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:364)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
... 3 more
Caused by: java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:76)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:42)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
... 5 more
02 Nov 2016 22:20:06,233 INFO [agent-shutdown-hook] (org.apache.zookeeper.ZooKeeper.close:684) - Session: 0x2581dc726ab01ad closed
02 Nov 2016 22:20:06,233 INFO [lifecycleSupervisor-1-1-EventThread] (org.apache.zookeeper.ClientCnxn$EventThread.run:512) - EventThread shut down
02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume_tbox-topic_SATL2036-1478087994030-54387da2], ZKConsumerConnector shut down completed
02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:150) - Component type: SOURCE, name: r1 stopped
02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:156) - Shutdown Metric for type: SOURCE, name: r1. source.start.time == 1478087994119
02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:162) - Shutdown Metric for type: SOURCE, name: r1. source.stop.time == 1478096406239
02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. source.kafka.commit.time == 555
02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. source.kafka.event.get.time == 1409513
02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.append-batch.accepted == 0
02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.append-batch.received == 0
02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.append.accepted == 0
02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.append.received == 0
02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.events.accepted == 343
02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.events.received == 343
02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0
02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.source.kafka.KafkaSource.stop:237) - Kafka Source r1 stopped. Metrics: SOURCE:r1{src.events.accepted=343, src.open-connection.count=0, src.append.received=0, source.kafka.event.get.time=1409513, src.append-batch.received=0, src.append-batch.accepted=0, src.append.accepted=0, src.events.received=343, source.kafka.commit.time=555}
02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], ZKConsumerConnector shutting down
02 Nov 2016 22:20:06,241 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478087993018] Stopping leader finder thread
02 Nov 2016 22:20:06,241 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread], Shutting down
02 Nov 2016 22:20:06,241 INFO [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread], Stopped
02 Nov 2016 22:20:06,241 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread], Shutdown completed
02 Nov 2016 22:20:06,241 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478087993018] Stopping all fetchers
02 Nov 2016 22:20:06,241 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1], Shutting down
02 Nov 2016 22:20:06,242 WARN [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1] (kafka.utils.Logging$class.warn:83) - Reconnect due to socket error: null
02 Nov 2016 22:20:06,242 INFO [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1], Stopped
02 Nov 2016 22:20:06,242 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1], Shutdown completed
02 Nov 2016 22:20:06,242 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478087993018] All connections stopped
02 Nov 2016 22:20:06,243 INFO [ZkClient-EventThread-42-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] (org.I0Itec.zkclient.ZkEventThread.run:82) - Terminate ZkClient event thread.
02 Nov 2016 22:20:06,244 INFO [agent-shutdown-hook] (org.apache.zookeeper.ZooKeeper.close:684) - Session: 0x356eb2d4b833fab closed
02 Nov 2016 22:20:06,244 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], ZKConsumerConnector shut down completed
02 Nov 2016 22:20:06,244 INFO [SinkRunner-PollingRunner-FailoverSinkProcessor-EventThread] (org.apache.zookeeper.ClientCnxn$EventThread.run:512) - EventThread shut down
02 Nov 2016 22:20:06,246 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - Shutting down producer
02 Nov 2016 22:20:06,247 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - Closing all sync producers
02 Nov 2016 22:20:06,256 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - Disconnecting from 10.25.20.36:9092
02 Nov 2016 22:20:06,256 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:150) - Component type: CHANNEL, name: c1 stopped
02 Nov 2016 22:20:06,256 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:156) - Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1478087992836
02 Nov 2016 22:20:06,256 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:162) - Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1478096406256
02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 0
02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 0
02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 0
02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 343
02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 0
02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 342
02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.kafka.commit.time == 201
02 Nov 2016 22:20:06,258 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.kafka.event.get.time == 531
02 Nov 2016 22:20:06,258 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.kafka.event.send.time == 1789
02 Nov 2016 22:20:06,258 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.rollback.count == 0
02 Nov 2016 22:20:06,258 INFO [agent-shutdown-hook] (org.apache.flume.channel.kafka.KafkaChannel.stop:123) - Kafka channel c1 stopped. Metrics: CHANNEL:c1{channel.event.put.attempt=0, channel.event.put.success=343, channel.kafka.event.get.time=531, channel.current.size=0, channel.event.take.attempt=0, channel.event.take.success=342, channel.kafka.event.send.time=1789, channel.capacity=0, channel.kafka.commit.time=201, channel.rollback.count=0}
02 Nov 2016 22:20:06,264 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332) - Error while getting events from Kafka
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,274 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kafka.KafkaSink.process:139) - Failed to publish events
org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306)
... 6 more
02 Nov 2016 22:20:06,275 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:150)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
... 3 more
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306)
... 6 more
02 Nov 2016 22:20:06,794 INFO [flume_tbox-topic_SATL2036-1478087994030-54387da2_watcher_executor] (kafka.utils.Logging$class.info:68) - [flume_tbox-topic_SATL2036-1478087994030-54387da2], stopping watcher executor thread for consumer flume_tbox-topic_SATL2036-1478087994030-54387da2
02 Nov 2016 22:20:06,816 INFO [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21_watcher_executor] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], stopping watcher executor thread for consumer flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21
02 Nov 2016 22:20:06,887 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332) - Error while getting events from Kafka
java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,888 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:459) - process failed
org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
... 6 more
02 Nov 2016 22:20:06,889 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.FailoverSinkProcessor.process:185) - Sink k1 failed and has been sent to failover list
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
... 3 more
Caused by: java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
... 6 more
02 Nov 2016 22:20:06,896 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332) - Error while getting events from Kafka
java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,897 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:459) - process failed
org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
... 6 more
02 Nov 2016 22:20:06,898 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.FailoverSinkProcessor.process:185) - Sink k2 failed and has been sent to failover list
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
... 3 more
Caused by: java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
... 6 more
02 Nov 2016 22:20:06,898 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332) - Error while getting events from Kafka
java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,898 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:459) - process failed
org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
... 6 more
02 Nov 2016 22:20:06,899 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.FailoverSinkProcessor.process:185) - Sink k3 failed and has been sent to failover list
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
... 3 more
Caused by: java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
... 6 more
02 Nov 2016 22:20:06,899 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: All sinks failed to process, nothing left to failover to
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:191)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:07,216 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], ZKConsumerConnector shutting down
02 Nov 2016 22:20:07,217 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478088061100] Stopping leader finder thread
02 Nov 2016 22:20:07,217 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], Shutting down
02 Nov 2016 22:20:07,218 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], Shutdown completed
02 Nov 2016 22:20:07,218 INFO [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], Stopped
02 Nov 2016 22:20:07,218 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478088061100] Stopping all fetchers
02 Nov 2016 22:20:07,218 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], Shutting down
02 Nov 2016 22:20:07,219 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], Shutdown completed
02 Nov 2016 22:20:07,219 INFO [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], Stopped
02 Nov 2016 22:20:07,220 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], Shutting down
02 Nov 2016 22:20:07,220 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], Shutdown completed
02 Nov 2016 22:20:07,220 INFO [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], Stopped
02 Nov 2016 22:20:07,221 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478088061100] All connections stopped
02 Nov 2016 22:20:07,223 INFO [ZkClient-EventThread-48-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] (org.I0Itec.zkclient.ZkEventThread.run:82) - Terminate ZkClient event thread.
02 Nov 2016 22:20:07,226 INFO [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0_watcher_executor] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], stopping watcher executor thread for consumer flume_tbox_parsed_SATL2036-1478088061094-7badc6c0
02 Nov 2016 22:20:07,226 INFO [agent-shutdown-hook] (org.apache.zookeeper.ZooKeeper.close:684) - Session: 0x156eb2d4a70421e closed
02 Nov 2016 22:20:07,226 INFO [lifecycleSupervisor-1-0-EventThread] (org.apache.zookeeper.ClientCnxn$EventThread.run:512) - EventThread shut down
02 Nov 2016 22:20:07,227 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], ZKConsumerConnector shut down completed
Issue 2:
03 Nov 2016 13:31:29,287 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Back off for 100 ms before retrying send. Remaining retries = 1
03 Nov 2016 13:31:29,388 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Fetching metadata from broker id:1,host:SATL2037,port:9092 with correlation id 2307612 for 1 topic(s) Set(channel-tbox-topic)
03 Nov 2016 13:31:29,388 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Connected to SATL2037:9092 for producing
03 Nov 2016 13:31:29,389 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Disconnecting from SATL2037:9092
03 Nov 2016 13:31:29,443 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Connected to SATL2037:9092 for producing
03 Nov 2016 13:31:29,549 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Disconnecting from SATL2037:9092
03 Nov 2016 13:31:29,550 WARN [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.warn:89) - Failed to send producer request with correlation id 2308613 to broker 2 with data for partitions [channel-tbox-topic,4]
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:42)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
03 Nov 2016 13:31:29,550 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Back off for 100 ms before retrying send. Remaining retries = 0
03 Nov 2016 13:31:29,651 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Fetching metadata from broker id:0,host:SATL2036,port:9092 with correlation id 2308614 for 1 topic(s) Set(channel-tbox-topic)
03 Nov 2016 13:31:29,651 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Connected to SATL2036:9092 for producing
03 Nov 2016 13:31:29,652 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Disconnecting from SATL2036:9092
03 Nov 2016 13:31:29,653 ERROR [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.error:97) - Failed to send requests for topics channel-tbox-topic with correlation ids in [2304607,2308614]
03 Nov 2016 13:31:29,653 WARN [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:363) - Sending events to Kafka failed
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:42)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
03 Nov 2016 13:31:29,653 ERROR [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.source.kafka.KafkaSource.process:153) - KafkaSource EXCEPTION, {}
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.kafka.KafkaChannel{name: c1}
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Commit failed as send to Kafka failed
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:364)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
... 3 more
Caused by: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:42)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
... 5 more
> After a leader change, messages sent with ack=0 are lost
> --------------------------------------------------------
>
> Key: KAFKA-955
> URL: https://issues.apache.org/jira/browse/KAFKA-955
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.0
> Reporter: Jason Rosenberg
> Assignee: Guozhang Wang
> Fix For: 0.8.0
>
> Attachments: KAFKA-955-followup.v1.patch, KAFKA-955.v1.patch, KAFKA-955.v1.patch, KAFKA-955.v2.patch, KAFKA-955.v3.patch, KAFKA-955.v4.patch, KAFKA-955.v5.patch, KAFKA-955.v6.patch, KAFKA-955.v7.patch
>
>
> If the leader changes for a partition, and a producer is sending messages with ack=0, then messages will be lost, since the producer has no active way of knowing that the leader has changed, until it's next metadata refresh update.
> The broker receiving the message, which is no longer the leader, logs a message like this:
> Produce request with correlation id 7136261 from client on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741
> This is exacerbated by the controlled shutdown mechanism, which forces an immediate leader change.
> A possible solution to this would be for a broker which receives a message, for a topic that it is no longer the leader for (and if the ack level is 0), then the broker could just silently forward the message over to the current leader.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)