You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Shivam Sharma <28...@gmail.com> on 2017/12/20 09:57:50 UTC

Facing issue of long running Flink job on Yarn

​Hi ,

I am always facing this issue with Flink job on yarn.
Basically I am reading data from kafka, transforming it & putting in kafka
only.

My build.sbt is:

val flinkVersion = "1.3.2"
val flinkKafkaConnect = "0.10.2"

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
    "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
    "org.apache.flink" %% "flink-table" % flinkVersion,
    "org.json4s" %% "json4s-native" % "3.5.3",
    "org.json4s" %% "json4s-jackson" % "3.5.3"

)

*Note: One of the node in our kafka Cluster ​goes down.*


java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
java:289)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
java:173)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.
java:108)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.
java:188)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.
java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.
java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.
scala:58)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.
scala:75)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
scala:65)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
scala:36)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.
java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.
java:597)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.
java:504)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.
java:275)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.
java:107)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.
java:946)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
java:286)
... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
scala:37)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
scala:28)
at DataStreamCalcRule$127.processElement(Unknown Source)
at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
scala:67)
at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
scala:35)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.
java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 23 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.
java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 35 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
scala:622)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
scala:622)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.
scala:622)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.
java:50)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 41 more
Caused by: java.lang.Exception: Failed to send data to Kafka: This server
is not the leader for that topic-partition.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.
java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
.invokeInternal(FlinkKafkaProducer010.java:302)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
.invoke(FlinkKafkaProducer010.java:407)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.
java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 52 more
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException:
This server is not the leader for that topic-partition.
2017-12-20 05:42:16,008 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state RUNNING
to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
java:289)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
java:173)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.
java:108)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.
java:188)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.
java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.
java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.
scala:58)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.
scala:75)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
scala:65)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
scala:36)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.
java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.
java:597)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.
java:504)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.
java:275)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.
java:107)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.
java:946)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
java:286)
... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
scala:37)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
scala:28)
at DataStreamCalcRule$127.processElement(Unknown Source)
at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
scala:67)
at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
scala:35)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.
java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 23 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.
java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 35 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
scala:622)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
scala:622)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.
scala:622)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.
java:50)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 41 more
Caused by: java.lang.Exception: Failed to send data to Kafka: This server
is not the leader for that topic-partition.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.
java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
.invokeInternal(FlinkKafkaProducer010.java:302)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
.invoke(FlinkKafkaProducer010.java:407)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.
java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 52 more


Thanks

-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsharma@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
<https://www.linkedin.com/in/28shivamsharma>*

Re: Facing issue of long running Flink job on Yarn

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

at this point, this seems more like a Kafka question than a Flink question. You think you need to configure high availability for Kafka with Zookeeper, you can probably find more about this here: https://www.cloudera.com/documentation/kafka/latest/topics/kafka_ha.html <https://www.cloudera.com/documentation/kafka/latest/topics/kafka_ha.html>.

Best,
Stefan


> Am 20.12.2017 um 12:06 schrieb Shivam Sharma <28...@gmail.com>:
> 
> Hi Stefan,
> 
> Kafka one node was down. But I want it to restart automatically . How can I
> solve it?
> 
> Thanks
> 
> On Wed, Dec 20, 2017 at 4:24 PM, Stefan Richter <s.richter@data-artisans.com
>> wrote:
> 
>> Hi,
>> 
>> did you see that the problem starts from a Kafka exception „Failed to send
>> data to Kafka: This server is not the leader for that topic-partition.“? Is
>> it possible that you had a network issue and the producer could not find
>> the leader broker?
>> 
>> Best,
>> Stefan
>> 
>>> Am 20.12.2017 um 10:57 schrieb Shivam Sharma <28...@gmail.com>:
>>> 
>>> ​Hi ,
>>> 
>>> I am always facing this issue with Flink job on yarn.
>>> Basically I am reading data from kafka, transforming it & putting in
>> kafka
>>> only.
>>> 
>>> My build.sbt is:
>>> 
>>> val flinkVersion = "1.3.2"
>>> val flinkKafkaConnect = "0.10.2"
>>> 
>>> libraryDependencies ++= Seq(
>>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
>>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>>   "org.apache.flink" %% "flink-table" % flinkVersion,
>>>   "org.json4s" %% "json4s-native" % "3.5.3",
>>>   "org.json4s" %% "json4s-jackson" % "3.5.3"
>>> 
>>> )
>>> 
>>> *Note: One of the node in our kafka Cluster ​goes down.*
>>> 
>>> 
>>> java.lang.RuntimeException: Exception occurred while processing valve
>>> output watermark:
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$
>> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
>>> java:289)
>>> at
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
>> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
>>> java:173)
>>> at
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
>> inputWatermark(StatusWatermarkValve.
>>> java:108)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
>> StreamInputProcessor.
>>> java:188)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
>> OneInputStreamTask.
>>> java:69)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.
>> invoke(StreamTask.
>>> java:263)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.
>> collect(TimeWindowPropertyCollector.
>>> scala:58)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc
>> tion.apply(IncrementalAggregateWindowFunction.
>>> scala:75)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
>> Function.apply(IncrementalAggregateTimeWindowFunction.
>>> scala:65)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
>> Function.apply(IncrementalAggregateTimeWindowFunction.
>>> scala:36)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.
>> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct
>> ion.
>>> java:44)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.
>> emitWindowContents(WindowOperator.
>>> java:597)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.onEventTime(WindowOperator.
>>> java:504)
>>> at
>>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.
>> advanceWatermark(HeapInternalTimerService.
>>> java:275)
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.
>> advanceWatermark(InternalTimeServiceManager.
>>> java:107)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.
>> processWatermark(AbstractStreamOperator.
>>> java:946)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$
>> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
>>> java:286)
>>> ... 7 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.table.runtime.CRowWrappingCollector.
>> collect(CRowWrappingCollector.
>>> scala:37)
>>> at
>>> org.apache.flink.table.runtime.CRowWrappingCollector.
>> collect(CRowWrappingCollector.
>>> scala:28)
>>> at DataStreamCalcRule$127.processElement(Unknown Source)
>>> at
>>> org.apache.flink.table.runtime.CRowProcessRunner.processElement(
>> CRowProcessRunner.
>>> scala:67)
>>> at
>>> org.apache.flink.table.runtime.CRowProcessRunner.processElement(
>> CRowProcessRunner.
>>> scala:35)
>>> at
>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(
>> ProcessOperator.
>>> java:66)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 23 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.
>> processElement(StreamMap.
>>> java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 35 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
>> anonfun$flatMap$1.apply(DataStream.
>>> scala:622)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
>> anonfun$flatMap$1.apply(DataStream.
>>> scala:622)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.
>> flatMap(DataStream.
>>> scala:622)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.
>> processElement(StreamFlatMap.
>>> java:50)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 41 more
>>> Caused by: java.lang.Exception: Failed to send data to Kafka: This server
>>> is not the leader for that topic-partition.
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
>> checkErroneous(FlinkKafkaProducerBase.
>>> java:373)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
>>> .invokeInternal(FlinkKafkaProducer010.java:302)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
>>> .invoke(FlinkKafkaProducer010.java:407)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.
>> processElement(StreamSink.
>>> java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 52 more
>>> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
>> :
>>> This server is not the leader for that topic-partition.
>>> 2017-12-20 05:42:16,008 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>> Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state
>> RUNNING
>>> to FAILING.
>>> java.lang.RuntimeException: Exception occurred while processing valve
>>> output watermark:
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$
>> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
>>> java:289)
>>> at
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
>> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
>>> java:173)
>>> at
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
>> inputWatermark(StatusWatermarkValve.
>>> java:108)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
>> StreamInputProcessor.
>>> java:188)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
>> OneInputStreamTask.
>>> java:69)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.
>> invoke(StreamTask.
>>> java:263)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.
>> collect(TimeWindowPropertyCollector.
>>> scala:58)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc
>> tion.apply(IncrementalAggregateWindowFunction.
>>> scala:75)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
>> Function.apply(IncrementalAggregateTimeWindowFunction.
>>> scala:65)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
>> Function.apply(IncrementalAggregateTimeWindowFunction.
>>> scala:36)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.
>> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct
>> ion.
>>> java:44)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.
>> emitWindowContents(WindowOperator.
>>> java:597)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.onEventTime(WindowOperator.
>>> java:504)
>>> at
>>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.
>> advanceWatermark(HeapInternalTimerService.
>>> java:275)
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.
>> advanceWatermark(InternalTimeServiceManager.
>>> java:107)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.
>> processWatermark(AbstractStreamOperator.
>>> java:946)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$
>> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
>>> java:286)
>>> ... 7 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.table.runtime.CRowWrappingCollector.
>> collect(CRowWrappingCollector.
>>> scala:37)
>>> at
>>> org.apache.flink.table.runtime.CRowWrappingCollector.
>> collect(CRowWrappingCollector.
>>> scala:28)
>>> at DataStreamCalcRule$127.processElement(Unknown Source)
>>> at
>>> org.apache.flink.table.runtime.CRowProcessRunner.processElement(
>> CRowProcessRunner.
>>> scala:67)
>>> at
>>> org.apache.flink.table.runtime.CRowProcessRunner.processElement(
>> CRowProcessRunner.
>>> scala:35)
>>> at
>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(
>> ProcessOperator.
>>> java:66)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 23 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.
>> processElement(StreamMap.
>>> java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 35 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
>> anonfun$flatMap$1.apply(DataStream.
>>> scala:622)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
>> anonfun$flatMap$1.apply(DataStream.
>>> scala:622)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.
>> flatMap(DataStream.
>>> scala:622)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.
>> processElement(StreamFlatMap.
>>> java:50)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 41 more
>>> Caused by: java.lang.Exception: Failed to send data to Kafka: This server
>>> is not the leader for that topic-partition.
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
>> checkErroneous(FlinkKafkaProducerBase.
>>> java:373)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
>>> .invokeInternal(FlinkKafkaProducer010.java:302)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
>>> .invoke(FlinkKafkaProducer010.java:407)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.
>> processElement(StreamSink.
>>> java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 52 more
>>> 
>>> 
>>> Thanks
>>> 
>>> --
>>> Shivam Sharma
>>> Data Engineer @ Goibibo
>>> Indian Institute Of Information Technology, Design and Manufacturing
>>> Jabalpur
>>> Mobile No- (+91) 8882114744
>>> Email:- 28shivamsharma@gmail.com
>>> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
>>> <https://www.linkedin.com/in/28shivamsharma>*
>> 
>> 
> 
> 
> -- 
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsharma@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> <https://www.linkedin.com/in/28shivamsharma>*


Re: Facing issue of long running Flink job on Yarn

Posted by Shivam Sharma <28...@gmail.com>.
Hi Stefan,

Kafka one node was down. But I want it to restart automatically . How can I
solve it?

Thanks

On Wed, Dec 20, 2017 at 4:24 PM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> did you see that the problem starts from a Kafka exception „Failed to send
> data to Kafka: This server is not the leader for that topic-partition.“? Is
> it possible that you had a network issue and the producer could not find
> the leader broker?
>
> Best,
> Stefan
>
> > Am 20.12.2017 um 10:57 schrieb Shivam Sharma <28...@gmail.com>:
> >
> > ​Hi ,
> >
> > I am always facing this issue with Flink job on yarn.
> > Basically I am reading data from kafka, transforming it & putting in
> kafka
> > only.
> >
> > My build.sbt is:
> >
> > val flinkVersion = "1.3.2"
> > val flinkKafkaConnect = "0.10.2"
> >
> > libraryDependencies ++= Seq(
> >    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
> >    "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
> >    "org.apache.flink" %% "flink-table" % flinkVersion,
> >    "org.json4s" %% "json4s-native" % "3.5.3",
> >    "org.json4s" %% "json4s-jackson" % "3.5.3"
> >
> > )
> >
> > *Note: One of the node in our kafka Cluster ​goes down.*
> >
> >
> > java.lang.RuntimeException: Exception occurred while processing valve
> > output watermark:
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> > java:289)
> > at
> > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
> > java:173)
> > at
> > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
> inputWatermark(StatusWatermarkValve.
> > java:108)
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.
> > java:188)
> > at
> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.
> > java:69)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.
> > java:263)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.
> collect(TimeWindowPropertyCollector.
> > scala:58)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc
> tion.apply(IncrementalAggregateWindowFunction.
> > scala:75)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
> Function.apply(IncrementalAggregateTimeWindowFunction.
> > scala:65)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
> Function.apply(IncrementalAggregateTimeWindowFunction.
> > scala:36)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct
> ion.
> > java:44)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.
> emitWindowContents(WindowOperator.
> > java:597)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.
> > java:504)
> > at
> > org.apache.flink.streaming.api.operators.HeapInternalTimerService.
> advanceWatermark(HeapInternalTimerService.
> > java:275)
> > at
> > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.
> advanceWatermark(InternalTimeServiceManager.
> > java:107)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> processWatermark(AbstractStreamOperator.
> > java:946)
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> > java:286)
> > ... 7 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.table.runtime.CRowWrappingCollector.
> collect(CRowWrappingCollector.
> > scala:37)
> > at
> > org.apache.flink.table.runtime.CRowWrappingCollector.
> collect(CRowWrappingCollector.
> > scala:28)
> > at DataStreamCalcRule$127.processElement(Unknown Source)
> > at
> > org.apache.flink.table.runtime.CRowProcessRunner.processElement(
> CRowProcessRunner.
> > scala:67)
> > at
> > org.apache.flink.table.runtime.CRowProcessRunner.processElement(
> CRowProcessRunner.
> > scala:35)
> > at
> > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(
> ProcessOperator.
> > java:66)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 23 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.
> > java:41)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 35 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.
> > scala:622)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.
> > scala:622)
> > at scala.collection.immutable.List.foreach(List.scala:381)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.
> > scala:622)
> > at
> > org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.
> > java:50)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 41 more
> > Caused by: java.lang.Exception: Failed to send data to Kafka: This server
> > is not the leader for that topic-partition.
> > at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
> checkErroneous(FlinkKafkaProducerBase.
> > java:373)
> > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> > .invokeInternal(FlinkKafkaProducer010.java:302)
> > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> > .invoke(FlinkKafkaProducer010.java:407)
> > at
> > org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.
> > java:41)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 52 more
> > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
> :
> > This server is not the leader for that topic-partition.
> > 2017-12-20 05:42:16,008 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> > Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state
> RUNNING
> > to FAILING.
> > java.lang.RuntimeException: Exception occurred while processing valve
> > output watermark:
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> > java:289)
> > at
> > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
> > java:173)
> > at
> > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
> inputWatermark(StatusWatermarkValve.
> > java:108)
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.
> > java:188)
> > at
> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.
> > java:69)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.
> > java:263)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.
> collect(TimeWindowPropertyCollector.
> > scala:58)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc
> tion.apply(IncrementalAggregateWindowFunction.
> > scala:75)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
> Function.apply(IncrementalAggregateTimeWindowFunction.
> > scala:65)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
> Function.apply(IncrementalAggregateTimeWindowFunction.
> > scala:36)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct
> ion.
> > java:44)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.
> emitWindowContents(WindowOperator.
> > java:597)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.
> > java:504)
> > at
> > org.apache.flink.streaming.api.operators.HeapInternalTimerService.
> advanceWatermark(HeapInternalTimerService.
> > java:275)
> > at
> > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.
> advanceWatermark(InternalTimeServiceManager.
> > java:107)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> processWatermark(AbstractStreamOperator.
> > java:946)
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> > java:286)
> > ... 7 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.table.runtime.CRowWrappingCollector.
> collect(CRowWrappingCollector.
> > scala:37)
> > at
> > org.apache.flink.table.runtime.CRowWrappingCollector.
> collect(CRowWrappingCollector.
> > scala:28)
> > at DataStreamCalcRule$127.processElement(Unknown Source)
> > at
> > org.apache.flink.table.runtime.CRowProcessRunner.processElement(
> CRowProcessRunner.
> > scala:67)
> > at
> > org.apache.flink.table.runtime.CRowProcessRunner.processElement(
> CRowProcessRunner.
> > scala:35)
> > at
> > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(
> ProcessOperator.
> > java:66)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 23 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.
> > java:41)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 35 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.
> > scala:622)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.
> > scala:622)
> > at scala.collection.immutable.List.foreach(List.scala:381)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.
> > scala:622)
> > at
> > org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.
> > java:50)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 41 more
> > Caused by: java.lang.Exception: Failed to send data to Kafka: This server
> > is not the leader for that topic-partition.
> > at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
> checkErroneous(FlinkKafkaProducerBase.
> > java:373)
> > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> > .invokeInternal(FlinkKafkaProducer010.java:302)
> > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> > .invoke(FlinkKafkaProducer010.java:407)
> > at
> > org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.
> > java:41)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 52 more
> >
> >
> > Thanks
> >
> > --
> > Shivam Sharma
> > Data Engineer @ Goibibo
> > Indian Institute Of Information Technology, Design and Manufacturing
> > Jabalpur
> > Mobile No- (+91) 8882114744
> > Email:- 28shivamsharma@gmail.com
> > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> > <https://www.linkedin.com/in/28shivamsharma>*
>
>


-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsharma@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
<https://www.linkedin.com/in/28shivamsharma>*

Re: Facing issue of long running Flink job on Yarn

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

did you see that the problem starts from a Kafka exception „Failed to send data to Kafka: This server is not the leader for that topic-partition.“? Is it possible that you had a network issue and the producer could not find the leader broker?

Best,
Stefan 

> Am 20.12.2017 um 10:57 schrieb Shivam Sharma <28...@gmail.com>:
> 
> ​Hi ,
> 
> I am always facing this issue with Flink job on yarn.
> Basically I am reading data from kafka, transforming it & putting in kafka
> only.
> 
> My build.sbt is:
> 
> val flinkVersion = "1.3.2"
> val flinkKafkaConnect = "0.10.2"
> 
> libraryDependencies ++= Seq(
>    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
>    "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>    "org.apache.flink" %% "flink-table" % flinkVersion,
>    "org.json4s" %% "json4s-native" % "3.5.3",
>    "org.json4s" %% "json4s-jackson" % "3.5.3"
> 
> )
> 
> *Note: One of the node in our kafka Cluster ​goes down.*
> 
> 
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> java:289)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
> java:173)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.
> java:108)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.
> java:188)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.
> java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.
> java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.
> scala:58)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.
> scala:75)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
> scala:65)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
> scala:36)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.
> java:44)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.
> java:597)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.
> java:504)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.
> java:275)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.
> java:107)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.
> java:946)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> java:286)
> ... 7 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
> scala:37)
> at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
> scala:28)
> at DataStreamCalcRule$127.processElement(Unknown Source)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
> scala:67)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
> scala:35)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.
> java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 23 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.
> java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 35 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
> scala:622)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
> scala:622)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.
> scala:622)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.
> java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 41 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server
> is not the leader for that topic-partition.
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.
> java:373)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> .invokeInternal(FlinkKafkaProducer010.java:302)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> .invoke(FlinkKafkaProducer010.java:407)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.
> java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 52 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This server is not the leader for that topic-partition.
> 2017-12-20 05:42:16,008 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state RUNNING
> to FAILING.
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> java:289)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
> java:173)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.
> java:108)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.
> java:188)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.
> java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.
> java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.
> scala:58)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.
> scala:75)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
> scala:65)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
> scala:36)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.
> java:44)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.
> java:597)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.
> java:504)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.
> java:275)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.
> java:107)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.
> java:946)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> java:286)
> ... 7 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
> scala:37)
> at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
> scala:28)
> at DataStreamCalcRule$127.processElement(Unknown Source)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
> scala:67)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
> scala:35)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.
> java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 23 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.
> java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 35 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
> scala:622)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
> scala:622)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.
> scala:622)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.
> java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 41 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server
> is not the leader for that topic-partition.
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.
> java:373)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> .invokeInternal(FlinkKafkaProducer010.java:302)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> .invoke(FlinkKafkaProducer010.java:407)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.
> java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 52 more
> 
> 
> Thanks
> 
> -- 
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsharma@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> <https://www.linkedin.com/in/28shivamsharma>*