You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Bill Zhang (JIRA)" <ji...@apache.org> on 2016/11/03 07:12:59 UTC

[jira] [Commented] (KAFKA-955) After a leader change, messages sent with ack=0 are lost

    [ https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15631889#comment-15631889 ] 

Bill Zhang commented on KAFKA-955:
----------------------------------

I am using Flume with Kafka Channel & facing below issues. 

Kafka Version: kafka_2.9.1-0.8.2.0
Flume Version: apache-flume-1.6.0

It seems was resolved from below :
Step 1: copy zookeeper Jar file to Flume classpath
Step 2: a1.channels.c1.kafka.producer.type = async

Note:
i didn't change default value of request.required.acks. It seems works, it is still in testing...


~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Issue 1:
02 Nov 2016 22:20:06,201 WARN  [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2] (kafka.utils.Logging$class.warn:83)  - Reconnect due to socket error: null
02 Nov 2016 22:20:06,203 INFO  [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2], Stopped 
02 Nov 2016 22:20:06,203 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2], Shutdown completed
02 Nov 2016 22:20:06,203 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], Shutting down
02 Nov 2016 22:20:06,204 WARN  [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1] (kafka.utils.Logging$class.warn:83)  - Reconnect due to socket error: null
02 Nov 2016 22:20:06,204 INFO  [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], Stopped 
02 Nov 2016 22:20:06,204 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], Shutdown completed
02 Nov 2016 22:20:06,205 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478087994042] All connections stopped
02 Nov 2016 22:20:06,207 INFO  [ZkClient-EventThread-58-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] (org.I0Itec.zkclient.ZkEventThread.run:82)  - Terminate ZkClient event thread.
02 Nov 2016 22:20:06,212 WARN  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.warn:89)  - Failed to send producer request with correlation id 34198503 to broker 1 with data for partitions [channel-tbox-parsed-topic,3]
java.nio.channels.ClosedByInterruptException
	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:511)
	at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
	at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
	at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
	at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
	at kafka.producer.Producer.send(Producer.scala:76)
	at kafka.javaapi.producer.Producer.send(Producer.scala:42)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
	at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
	at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
	at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
	at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
	at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,214 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Back off for 100 ms before retrying send. Remaining retries = 3
02 Nov 2016 22:20:06,214 WARN  [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:363)  - Sending events to Kafka failed
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:76)
	at kafka.producer.Producer.send(Producer.scala:76)
	at kafka.javaapi.producer.Producer.send(Producer.scala:42)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
	at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
	at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
	at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
	at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
	at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,215 ERROR [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.source.kafka.KafkaSource.process:153)  - KafkaSource EXCEPTION, {}
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.kafka.KafkaChannel{name: c1}
	at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
	at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
	at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Commit failed as send to Kafka failed
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:364)
	at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
	at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
	... 3 more
Caused by: java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:76)
	at kafka.producer.Producer.send(Producer.scala:76)
	at kafka.javaapi.producer.Producer.send(Producer.scala:42)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
	... 5 more
02 Nov 2016 22:20:06,233 INFO  [agent-shutdown-hook] (org.apache.zookeeper.ZooKeeper.close:684)  - Session: 0x2581dc726ab01ad closed
02 Nov 2016 22:20:06,233 INFO  [lifecycleSupervisor-1-1-EventThread] (org.apache.zookeeper.ClientCnxn$EventThread.run:512)  - EventThread shut down
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [flume_tbox-topic_SATL2036-1478087994030-54387da2], ZKConsumerConnector shut down completed
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:150)  - Component type: SOURCE, name: r1 stopped
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:156)  - Shutdown Metric for type: SOURCE, name: r1. source.start.time == 1478087994119
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:162)  - Shutdown Metric for type: SOURCE, name: r1. source.stop.time == 1478096406239
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: SOURCE, name: r1. source.kafka.commit.time == 555
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: SOURCE, name: r1. source.kafka.event.get.time == 1409513
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: SOURCE, name: r1. src.append-batch.accepted == 0
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: SOURCE, name: r1. src.append-batch.received == 0
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: SOURCE, name: r1. src.append.accepted == 0
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: SOURCE, name: r1. src.append.received == 0
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: SOURCE, name: r1. src.events.accepted == 343
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: SOURCE, name: r1. src.events.received == 343
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] (org.apache.flume.source.kafka.KafkaSource.stop:237)  - Kafka Source r1 stopped. Metrics: SOURCE:r1{src.events.accepted=343, src.open-connection.count=0, src.append.received=0, source.kafka.event.get.time=1409513, src.append-batch.received=0, src.append-batch.accepted=0, src.append.accepted=0, src.events.received=343, source.kafka.commit.time=555}
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], ZKConsumerConnector shutting down
02 Nov 2016 22:20:06,241 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478087993018] Stopping leader finder thread
02 Nov 2016 22:20:06,241 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread], Shutting down
02 Nov 2016 22:20:06,241 INFO  [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread] (kafka.utils.Logging$class.info:68)  - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread], Stopped 
02 Nov 2016 22:20:06,241 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread], Shutdown completed
02 Nov 2016 22:20:06,241 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478087993018] Stopping all fetchers
02 Nov 2016 22:20:06,241 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1], Shutting down
02 Nov 2016 22:20:06,242 WARN  [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1] (kafka.utils.Logging$class.warn:83)  - Reconnect due to socket error: null
02 Nov 2016 22:20:06,242 INFO  [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1], Stopped 
02 Nov 2016 22:20:06,242 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1], Shutdown completed
02 Nov 2016 22:20:06,242 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478087993018] All connections stopped
02 Nov 2016 22:20:06,243 INFO  [ZkClient-EventThread-42-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] (org.I0Itec.zkclient.ZkEventThread.run:82)  - Terminate ZkClient event thread.
02 Nov 2016 22:20:06,244 INFO  [agent-shutdown-hook] (org.apache.zookeeper.ZooKeeper.close:684)  - Session: 0x356eb2d4b833fab closed
02 Nov 2016 22:20:06,244 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], ZKConsumerConnector shut down completed
02 Nov 2016 22:20:06,244 INFO  [SinkRunner-PollingRunner-FailoverSinkProcessor-EventThread] (org.apache.zookeeper.ClientCnxn$EventThread.run:512)  - EventThread shut down
02 Nov 2016 22:20:06,246 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - Shutting down producer
02 Nov 2016 22:20:06,247 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - Closing all sync producers
02 Nov 2016 22:20:06,256 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - Disconnecting from 10.25.20.36:9092
02 Nov 2016 22:20:06,256 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:150)  - Component type: CHANNEL, name: c1 stopped
02 Nov 2016 22:20:06,256 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:156)  - Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1478087992836
02 Nov 2016 22:20:06,256 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:162)  - Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1478096406256
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 0
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 0
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 0
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 343
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 0
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 342
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: CHANNEL, name: c1. channel.kafka.commit.time == 201
02 Nov 2016 22:20:06,258 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: CHANNEL, name: c1. channel.kafka.event.get.time == 531
02 Nov 2016 22:20:06,258 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: CHANNEL, name: c1. channel.kafka.event.send.time == 1789
02 Nov 2016 22:20:06,258 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown Metric for type: CHANNEL, name: c1. channel.rollback.count == 0
02 Nov 2016 22:20:06,258 INFO  [agent-shutdown-hook] (org.apache.flume.channel.kafka.KafkaChannel.stop:123)  - Kafka channel c1 stopped. Metrics: CHANNEL:c1{channel.event.put.attempt=0, channel.event.put.success=343, channel.kafka.event.get.time=531, channel.current.size=0, channel.event.take.attempt=0, channel.event.take.success=342, channel.kafka.event.send.time=1789, channel.capacity=0, channel.kafka.commit.time=201, channel.rollback.count=0}
02 Nov 2016 22:20:06,264 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332)  - Error while getting events from Kafka
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
	at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
	at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,274 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kafka.KafkaSink.process:139)  - Failed to publish events
org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
	at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
	at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306)
	... 6 more
02 Nov 2016 22:20:06,275 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:150)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
	... 3 more
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
	at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
	at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306)
	... 6 more
02 Nov 2016 22:20:06,794 INFO  [flume_tbox-topic_SATL2036-1478087994030-54387da2_watcher_executor] (kafka.utils.Logging$class.info:68)  - [flume_tbox-topic_SATL2036-1478087994030-54387da2], stopping watcher executor thread for consumer flume_tbox-topic_SATL2036-1478087994030-54387da2
02 Nov 2016 22:20:06,816 INFO  [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21_watcher_executor] (kafka.utils.Logging$class.info:68)  - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], stopping watcher executor thread for consumer flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21
02 Nov 2016 22:20:06,887 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332)  - Error while getting events from Kafka
java.util.NoSuchElementException
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
	at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
	at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,888 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:459)  - process failed
org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
	at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
	at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
	... 6 more
02 Nov 2016 22:20:06,889 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.FailoverSinkProcessor.process:185)  - Sink k1 failed and has been sent to failover list
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
	at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
	... 3 more
Caused by: java.util.NoSuchElementException
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
	at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
	... 6 more
02 Nov 2016 22:20:06,896 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332)  - Error while getting events from Kafka
java.util.NoSuchElementException
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
	at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
	at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,897 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:459)  - process failed
org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
	at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
	at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
	... 6 more
02 Nov 2016 22:20:06,898 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.FailoverSinkProcessor.process:185)  - Sink k2 failed and has been sent to failover list
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
	at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
	... 3 more
Caused by: java.util.NoSuchElementException
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
	at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
	... 6 more
02 Nov 2016 22:20:06,898 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332)  - Error while getting events from Kafka
java.util.NoSuchElementException
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
	at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
	at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,898 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:459)  - process failed
org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
	at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
	at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
	... 6 more
02 Nov 2016 22:20:06,899 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.FailoverSinkProcessor.process:185)  - Sink k3 failed and has been sent to failover list
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
	at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
	... 3 more
Caused by: java.util.NoSuchElementException
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
	at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
	... 6 more
02 Nov 2016 22:20:06,899 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: All sinks failed to process, nothing left to failover to
	at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:191)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:07,216 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], ZKConsumerConnector shutting down
02 Nov 2016 22:20:07,217 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478088061100] Stopping leader finder thread
02 Nov 2016 22:20:07,217 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], Shutting down
02 Nov 2016 22:20:07,218 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], Shutdown completed
02 Nov 2016 22:20:07,218 INFO  [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread] (kafka.utils.Logging$class.info:68)  - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], Stopped 
02 Nov 2016 22:20:07,218 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478088061100] Stopping all fetchers
02 Nov 2016 22:20:07,218 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], Shutting down
02 Nov 2016 22:20:07,219 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], Shutdown completed
02 Nov 2016 22:20:07,219 INFO  [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], Stopped 
02 Nov 2016 22:20:07,220 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], Shutting down
02 Nov 2016 22:20:07,220 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], Shutdown completed
02 Nov 2016 22:20:07,220 INFO  [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], Stopped 
02 Nov 2016 22:20:07,221 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478088061100] All connections stopped
02 Nov 2016 22:20:07,223 INFO  [ZkClient-EventThread-48-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] (org.I0Itec.zkclient.ZkEventThread.run:82)  - Terminate ZkClient event thread.
02 Nov 2016 22:20:07,226 INFO  [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0_watcher_executor] (kafka.utils.Logging$class.info:68)  - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], stopping watcher executor thread for consumer flume_tbox_parsed_SATL2036-1478088061094-7badc6c0
02 Nov 2016 22:20:07,226 INFO  [agent-shutdown-hook] (org.apache.zookeeper.ZooKeeper.close:684)  - Session: 0x156eb2d4a70421e closed
02 Nov 2016 22:20:07,226 INFO  [lifecycleSupervisor-1-0-EventThread] (org.apache.zookeeper.ClientCnxn$EventThread.run:512)  - EventThread shut down
02 Nov 2016 22:20:07,227 INFO  [agent-shutdown-hook] (kafka.utils.Logging$class.info:68)  - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], ZKConsumerConnector shut down completed



Issue 2:
03 Nov 2016 13:31:29,287 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Back off for 100 ms before retrying send. Remaining retries = 1
03 Nov 2016 13:31:29,388 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Fetching metadata from broker id:1,host:SATL2037,port:9092 with correlation id 2307612 for 1 topic(s) Set(channel-tbox-topic)
03 Nov 2016 13:31:29,388 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Connected to SATL2037:9092 for producing
03 Nov 2016 13:31:29,389 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Disconnecting from SATL2037:9092
03 Nov 2016 13:31:29,443 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Connected to SATL2037:9092 for producing
03 Nov 2016 13:31:29,549 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Disconnecting from SATL2037:9092
03 Nov 2016 13:31:29,550 WARN  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.warn:89)  - Failed to send producer request with correlation id 2308613 to broker 2 with data for partitions [channel-tbox-topic,4]
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
	at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
	at sun.nio.ch.IOUtil.write(IOUtil.java:148)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
	at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
	at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
	at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
	at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
	at kafka.producer.Producer.send(Producer.scala:76)
	at kafka.javaapi.producer.Producer.send(Producer.scala:42)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
	at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
	at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
	at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
	at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
	at java.lang.Thread.run(Thread.java:745)
03 Nov 2016 13:31:29,550 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Back off for 100 ms before retrying send. Remaining retries = 0
03 Nov 2016 13:31:29,651 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Fetching metadata from broker id:0,host:SATL2036,port:9092 with correlation id 2308614 for 1 topic(s) Set(channel-tbox-topic)
03 Nov 2016 13:31:29,651 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Connected to SATL2036:9092 for producing
03 Nov 2016 13:31:29,652 INFO  [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68)  - Disconnecting from SATL2036:9092
03 Nov 2016 13:31:29,653 ERROR [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.error:97)  - Failed to send requests for topics channel-tbox-topic with correlation ids in [2304607,2308614]
03 Nov 2016 13:31:29,653 WARN  [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:363)  - Sending events to Kafka failed
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
	at kafka.producer.Producer.send(Producer.scala:76)
	at kafka.javaapi.producer.Producer.send(Producer.scala:42)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
	at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
	at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
	at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
	at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
	at java.lang.Thread.run(Thread.java:745)
03 Nov 2016 13:31:29,653 ERROR [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.source.kafka.KafkaSource.process:153)  - KafkaSource EXCEPTION, {}
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.kafka.KafkaChannel{name: c1}
	at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
	at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
	at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Commit failed as send to Kafka failed
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:364)
	at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
	at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
	... 3 more
Caused by: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
	at kafka.producer.Producer.send(Producer.scala:76)
	at kafka.javaapi.producer.Producer.send(Producer.scala:42)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
	... 5 more







> After a leader change, messages sent with ack=0 are lost
> --------------------------------------------------------
>
>                 Key: KAFKA-955
>                 URL: https://issues.apache.org/jira/browse/KAFKA-955
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.0
>            Reporter: Jason Rosenberg
>            Assignee: Guozhang Wang
>             Fix For: 0.8.0
>
>         Attachments: KAFKA-955-followup.v1.patch, KAFKA-955.v1.patch, KAFKA-955.v1.patch, KAFKA-955.v2.patch, KAFKA-955.v3.patch, KAFKA-955.v4.patch, KAFKA-955.v5.patch, KAFKA-955.v6.patch, KAFKA-955.v7.patch
>
>
> If the leader changes for a partition, and a producer is sending messages with ack=0, then messages will be lost, since the producer has no active way of knowing that the leader has changed, until it's next metadata refresh update.
> The broker receiving the message, which is no longer the leader, logs a message like this:
> Produce request with correlation id 7136261 from client  on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741
> This is exacerbated by the controlled shutdown mechanism, which forces an immediate leader change.
> A possible solution to this would be for a broker which receives a message, for a topic that it is no longer the leader for (and if the ack level is 0), then the broker could just silently forward the message over to the current leader.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)