You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matthias Pohl <ma...@ververica.com> on 2021/03/01 12:22:02 UTC

Re: Flink application kept restarting

Thanks for providing this information, Rainie. Are other issues documented
in the logs besides the TimeoutException in the JM logs which you already
shared? For now, it looks like that there was a connection problem between
the TaskManager and the JobManager that caused the shutdown of the operator
resulting in the NetworkBufferPool to be destroyed. For this scenario I
would expect other failures to occur besides the ones you shared.

Best,
Matthias

On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <ra...@pinterest.com> wrote:

> Thank you Mattias.
> It’s version1.9.
>
> Best regards
> Rainie
>
> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> Hi Rainie,
>> the network buffer pool was destroyed for some reason. This happens when
>> the NettyShuffleEnvironment gets closed which is triggered when an operator
>> is cleaned up, for instance. Maybe, the timeout in the metric system caused
>> this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
>> conversation hoping that he can give more insights.
>>
>> If I may ask: What Flink version are you using?
>>
>> Thanks,
>> Matthias
>>
>>
>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <ra...@pinterest.com> wrote:
>>
>>> Hi All,
>>>
>>> Our flink application kept restarting and it did lots of RPC calls to a
>>> dependency service.
>>>
>>> *We saw this exception from failed task manager log: *
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>> Could not forward element to next operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>> at
>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
>>> at
>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
>>> at
>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>> ... 23 more
>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
>>> at
>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>> ... 32 more
>>>
>>> *We also saw this exception from Job manager log:*
>>> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting
>>>                    - Association to [akka.tcp://flink-metrics@host:38593]
>>> with UID [-1261564990] irrecoverably failed. Quarantining address.
>>> java.util.concurrent.TimeoutException: Remote system has been silent for
>>> too long. (more than 48.0 hours)
>>> at
>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>> at
>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> This app has been running fine for a month.
>>> Any suggestion what could cause the issue? Any suggestions on how to
>>> debug it?
>>> Appreciated all advice.
>>>
>>> Thanks
>>> Best regards
>>> Rainie
>>>
>>

Re: Flink application kept restarting

Posted by Rainie Li <ra...@pinterest.com>.
Thanks Julian. Appreciated your help.
I will check task manager logs and decide pick which approah.

Best regards
Rainie


On Thu, Mar 4, 2021 at 3:04 PM Jaffe, Julian <Ju...@activision.com>
wrote:

> Hey Rainie,
>
>
>
> Kafka internally attempts to retry topic metadata fetches if possible. If
> you think the root cause was just due to network congestion or the like,
> you might want to look into increasing `request.timeout.ms`. Because of
> the internal retry attempts, however, this exception usually means that the
> client was unable to connect to the brokers in a more permanent way. The
> usual culprits are SSL topics with plaintext consumers or vice versa, SSL
> topics where the client was unable to fetch the necessary credentials,
> incorrect bootstrap servers configs, and cluster maintenance. Since your
> job was running successfully for a long time before crashing, I’d check the
> task manager logs for the period just before the error for any warnings or
> errors from Kafka to see if they shed light. If you’re dynamically loading
> SSL certs or using cnames to provide stable bootstrap server identifiers,
> it can be non-obvious if your config changed mid-operation, but often times
> some part of the stack will log a warning message or swallowed exception.
> If you have verbose logging, you might also be logging the Kafka connection
> properties each time they’re regenerated.
>
>
>
> TL;DR:
>
>
>
> If you think the issue is due to high latency in communicating with your
> Kafka cluster, increase your configured timeouts.
>
>
>
> If you think the issue is due to (possibly temporary) dynamic config
> changes, check your error handling. Retrying the fetch from the Flink side
> will only help if the cause of the time is very transient.
>
>
>
> Julian
>
>
>
> *From: *Rainie Li <ra...@pinterest.com>
> *Date: *Thursday, March 4, 2021 at 1:49 PM
> *To: *"matthias@ververica.com" <ma...@ververica.com>
> *Cc: *user <us...@flink.apache.org>, Chesnay Schepler <ch...@apache.org>
> *Subject: *Re: Flink application kept restarting
>
>
>
> Hi Matthias,
>
>
>
> Do you have any suggestions to handle timeout issues when fetching data
> from a Kafka topic?
>
> I am thinking of adding a retry logic into flink job, not sure if this is
> the right direction.
>
>
>
> Thanks again
>
> Best regards
>
> Rainie
>
>
>
> On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
> Hi Rainie,
>
> in general buffer pools being destroyed usually mean that some other
> exception occurred that caused the task to fail and in the process of
> failure handling the operator-related network buffer is destroyed. That
> causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
> case. It looks like you had some timeout problem while fetching data from a
> Kafka topic.
>
>
>
> Matthias
>
>
>
> On Tue, Mar 2, 2021 at 10:39 AM Rainie Li <ra...@pinterest.com> wrote:
>
> Thanks for checking, Matthias.
>
>
>
> I have another flink job which failed last weekend with the same buffer
> pool destroyed error. This job is also running version 1.9.
>
> Here is the error I found from the task manager log. Any suggestion what
> is the root cause and how to fix it?
>
>
>
> 2021-02-28 00:54:45,943 WARN
>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
> while canceling task.
> java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> --
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.runtime.io.network.partition.ProducerFailedException:
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:107)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:170)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:86)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
> ... 2 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout
> expired while fetching topic metadata
>
>
>
>
>
> Thanks again!
>
> Best regards
>
> Rainie
>
>
>
> On Mon, Mar 1, 2021 at 4:38 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
> Another question is: The timeout of 48 hours sounds strange. There should
> have been some other system noticing the connection problem earlier
> assuming that you have a reasonably low heartbeat interval configured.
>
>
>
> Matthias
>
>
>
> On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
> Thanks for providing this information, Rainie. Are other issues documented
> in the logs besides the TimeoutException in the JM logs which you already
> shared? For now, it looks like that there was a connection problem between
> the TaskManager and the JobManager that caused the shutdown of the operator
> resulting in the NetworkBufferPool to be destroyed. For this scenario I
> would expect other failures to occur besides the ones you shared.
>
>
>
> Best,
> Matthias
>
>
>
> On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <ra...@pinterest.com> wrote:
>
> Thank you Mattias.
>
> It’s version1.9.
>
>
>
> Best regards
>
> Rainie
>
>
>
> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
> Hi Rainie,
>
> the network buffer pool was destroyed for some reason. This happens when
> the NettyShuffleEnvironment gets closed which is triggered when an operator
> is cleaned up, for instance. Maybe, the timeout in the metric system caused
> this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
> conversation hoping that he can give more insights.
>
>
>
> If I may ask: What Flink version are you using?
>
>
>
> Thanks,
> Matthias
>
>
>
>
>
> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <ra...@pinterest.com> wrote:
>
> Hi All,
>
>
>
> Our flink application kept restarting and it did lots of RPC calls to a
> dependency service.
>
>
>
> *We saw this exception from failed task manager log: *
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
> at
> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
> at
> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> ... 23 more
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
> at
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> ... 32 more
>
>
>
> *We also saw this exception from Job manager log:*
>
> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting
>                  - Association to [akka.tcp://flink-metrics@host:38593]
> with UID [-1261564990] irrecoverably failed. Quarantining address.
> java.util.concurrent.TimeoutException: Remote system has been silent for
> too long. (more than 48.0 hours)
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> This app has been running fine for a month.
>
> Any suggestion what could cause the issue? Any suggestions on how to debug
> it?
>
> Appreciated all advice.
>
>
>
> Thanks
>
> Best regards
>
> Rainie
>
>

Re: Flink application kept restarting

Posted by "Jaffe, Julian" <Ju...@activision.com>.
Hey Rainie,

Kafka internally attempts to retry topic metadata fetches if possible. If you think the root cause was just due to network congestion or the like, you might want to look into increasing `request.timeout.ms`. Because of the internal retry attempts, however, this exception usually means that the client was unable to connect to the brokers in a more permanent way. The usual culprits are SSL topics with plaintext consumers or vice versa, SSL topics where the client was unable to fetch the necessary credentials, incorrect bootstrap servers configs, and cluster maintenance. Since your job was running successfully for a long time before crashing, I’d check the task manager logs for the period just before the error for any warnings or errors from Kafka to see if they shed light. If you’re dynamically loading SSL certs or using cnames to provide stable bootstrap server identifiers, it can be non-obvious if your config changed mid-operation, but often times some part of the stack will log a warning message or swallowed exception. If you have verbose logging, you might also be logging the Kafka connection properties each time they’re regenerated.

TL;DR:

If you think the issue is due to high latency in communicating with your Kafka cluster, increase your configured timeouts.

If you think the issue is due to (possibly temporary) dynamic config changes, check your error handling. Retrying the fetch from the Flink side will only help if the cause of the time is very transient.

Julian

From: Rainie Li <ra...@pinterest.com>
Date: Thursday, March 4, 2021 at 1:49 PM
To: "matthias@ververica.com" <ma...@ververica.com>
Cc: user <us...@flink.apache.org>, Chesnay Schepler <ch...@apache.org>
Subject: Re: Flink application kept restarting

Hi Matthias,

Do you have any suggestions to handle timeout issues when fetching data from a Kafka topic?
I am thinking of adding a retry logic into flink job, not sure if this is the right direction.

Thanks again
Best regards
Rainie

On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl <ma...@ververica.com>> wrote:
Hi Rainie,
in general buffer pools being destroyed usually mean that some other exception occurred that caused the task to fail and in the process of failure handling the operator-related network buffer is destroyed. That causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your case. It looks like you had some timeout problem while fetching data from a Kafka topic.

Matthias

On Tue, Mar 2, 2021 at 10:39 AM Rainie Li <ra...@pinterest.com>> wrote:
Thanks for checking, Matthias.

I have another flink job which failed last weekend with the same buffer pool destroyed error. This job is also running version 1.9.
Here is the error I found from the task manager log. Any suggestion what is the root cause and how to fix it?

2021-02-28 00:54:45,943 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error while canceling task.
java.lang.RuntimeException: Buffer pool is destroyed.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
--
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:107)
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:170)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959)
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:86)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
... 2 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata


Thanks again!
Best regards
Rainie

On Mon, Mar 1, 2021 at 4:38 AM Matthias Pohl <ma...@ververica.com>> wrote:
Another question is: The timeout of 48 hours sounds strange. There should have been some other system noticing the connection problem earlier assuming that you have a reasonably low heartbeat interval configured.

Matthias

On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <ma...@ververica.com>> wrote:
Thanks for providing this information, Rainie. Are other issues documented in the logs besides the TimeoutException in the JM logs which you already shared? For now, it looks like that there was a connection problem between the TaskManager and the JobManager that caused the shutdown of the operator resulting in the NetworkBufferPool to be destroyed. For this scenario I would expect other failures to occur besides the ones you shared.

Best,
Matthias

On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <ra...@pinterest.com>> wrote:
Thank you Mattias.
It’s version1.9.

Best regards
Rainie

On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <ma...@ververica.com>> wrote:
Hi Rainie,
the network buffer pool was destroyed for some reason. This happens when the NettyShuffleEnvironment gets closed which is triggered when an operator is cleaned up, for instance. Maybe, the timeout in the metric system caused this. But I'm not sure how this is connected. I'm gonna add Chesnay to this conversation hoping that he can give more insights.

If I may ask: What Flink version are you using?

Thanks,
Matthias


On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <ra...@pinterest.com>> wrote:
Hi All,

Our flink application kept restarting and it did lots of RPC calls to a dependency service.

We saw this exception from failed task manager log:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
at com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
at com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 23 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 32 more

We also saw this exception from Job manager log:
2021-02-25 21:32:42,874 ERROR akka.remote.Remoting                                          - Association to [akka.tcp://flink-metrics@host:38593] with UID [-1261564990] irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

This app has been running fine for a month.
Any suggestion what could cause the issue? Any suggestions on how to debug it?
Appreciated all advice.

Thanks
Best regards
Rainie

Re: Flink application kept restarting

Posted by Rainie Li <ra...@pinterest.com>.
Hi Matthias,

Do you have any suggestions to handle timeout issues when fetching data
from a Kafka topic?
I am thinking of adding a retry logic into flink job, not sure if this is
the right direction.

Thanks again
Best regards
Rainie

On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl <ma...@ververica.com>
wrote:

> Hi Rainie,
> in general buffer pools being destroyed usually mean that some other
> exception occurred that caused the task to fail and in the process of
> failure handling the operator-related network buffer is destroyed. That
> causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
> case. It looks like you had some timeout problem while fetching data from a
> Kafka topic.
>
> Matthias
>
> On Tue, Mar 2, 2021 at 10:39 AM Rainie Li <ra...@pinterest.com> wrote:
>
>> Thanks for checking, Matthias.
>>
>> I have another flink job which failed last weekend with the same buffer
>> pool destroyed error. This job is also running version 1.9.
>> Here is the error I found from the task manager log. Any suggestion what
>> is the root cause and how to fix it?
>>
>> 2021-02-28 00:54:45,943 WARN
>>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
>> while canceling task.
>> java.lang.RuntimeException: Buffer pool is destroyed.
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> --
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.io.network.partition.ProducerFailedException:
>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>> fetching topic metadata
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:107)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:170)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:86)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
>> ... 2 more
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout
>> expired while fetching topic metadata
>>
>>
>> Thanks again!
>> Best regards
>> Rainie
>>
>> On Mon, Mar 1, 2021 at 4:38 AM Matthias Pohl <ma...@ververica.com>
>> wrote:
>>
>>> Another question is: The timeout of 48 hours sounds strange. There
>>> should have been some other system noticing the connection problem earlier
>>> assuming that you have a reasonably low heartbeat interval configured.
>>>
>>> Matthias
>>>
>>> On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <ma...@ververica.com>
>>> wrote:
>>>
>>>> Thanks for providing this information, Rainie. Are other issues
>>>> documented in the logs besides the TimeoutException in the JM logs which
>>>> you already shared? For now, it looks like that there was a connection
>>>> problem between the TaskManager and the JobManager that caused the shutdown
>>>> of the operator resulting in the NetworkBufferPool to be destroyed. For
>>>> this scenario I would expect other failures to occur besides the ones you
>>>> shared.
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <ra...@pinterest.com>
>>>> wrote:
>>>>
>>>>> Thank you Mattias.
>>>>> It’s version1.9.
>>>>>
>>>>> Best regards
>>>>> Rainie
>>>>>
>>>>> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <ma...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Rainie,
>>>>>> the network buffer pool was destroyed for some reason. This happens
>>>>>> when the NettyShuffleEnvironment gets closed which is triggered when an
>>>>>> operator is cleaned up, for instance. Maybe, the timeout in the metric
>>>>>> system caused this. But I'm not sure how this is connected. I'm gonna add
>>>>>> Chesnay to this conversation hoping that he can give more insights.
>>>>>>
>>>>>> If I may ask: What Flink version are you using?
>>>>>>
>>>>>> Thanks,
>>>>>> Matthias
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <ra...@pinterest.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> Our flink application kept restarting and it did lots of RPC calls
>>>>>>> to a dependency service.
>>>>>>>
>>>>>>> *We saw this exception from failed task manager log: *
>>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>>>> Could not forward element to next operator
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>>>>>>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>> ... 23 more
>>>>>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>>>>>> ... 32 more
>>>>>>>
>>>>>>> *We also saw this exception from Job manager log:*
>>>>>>> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting
>>>>>>>                        - Association to [akka.tcp://flink-metrics@host:38593]
>>>>>>> with UID [-1261564990] irrecoverably failed. Quarantining address.
>>>>>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>>>>>> for too long. (more than 48.0 hours)
>>>>>>> at
>>>>>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>> at
>>>>>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>> at
>>>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>> at
>>>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>> at
>>>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>
>>>>>>> This app has been running fine for a month.
>>>>>>> Any suggestion what could cause the issue? Any suggestions on how to
>>>>>>> debug it?
>>>>>>> Appreciated all advice.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best regards
>>>>>>> Rainie
>>>>>>>
>>>>>>

Re: Flink application kept restarting

Posted by Rainie Li <ra...@pinterest.com>.
I see.
Thank you for the explanation.

Best regards
Rainie

On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl <ma...@ververica.com>
wrote:

> Hi Rainie,
> in general buffer pools being destroyed usually mean that some other
> exception occurred that caused the task to fail and in the process of
> failure handling the operator-related network buffer is destroyed. That
> causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
> case. It looks like you had some timeout problem while fetching data from a
> Kafka topic.
>
> Matthias
>
> On Tue, Mar 2, 2021 at 10:39 AM Rainie Li <ra...@pinterest.com> wrote:
>
>> Thanks for checking, Matthias.
>>
>> I have another flink job which failed last weekend with the same buffer
>> pool destroyed error. This job is also running version 1.9.
>> Here is the error I found from the task manager log. Any suggestion what
>> is the root cause and how to fix it?
>>
>> 2021-02-28 00:54:45,943 WARN
>>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
>> while canceling task.
>> java.lang.RuntimeException: Buffer pool is destroyed.
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> --
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.io.network.partition.ProducerFailedException:
>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>> fetching topic metadata
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:107)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:170)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:86)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
>> ... 2 more
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout
>> expired while fetching topic metadata
>>
>>
>> Thanks again!
>> Best regards
>> Rainie
>>
>> On Mon, Mar 1, 2021 at 4:38 AM Matthias Pohl <ma...@ververica.com>
>> wrote:
>>
>>> Another question is: The timeout of 48 hours sounds strange. There
>>> should have been some other system noticing the connection problem earlier
>>> assuming that you have a reasonably low heartbeat interval configured.
>>>
>>> Matthias
>>>
>>> On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <ma...@ververica.com>
>>> wrote:
>>>
>>>> Thanks for providing this information, Rainie. Are other issues
>>>> documented in the logs besides the TimeoutException in the JM logs which
>>>> you already shared? For now, it looks like that there was a connection
>>>> problem between the TaskManager and the JobManager that caused the shutdown
>>>> of the operator resulting in the NetworkBufferPool to be destroyed. For
>>>> this scenario I would expect other failures to occur besides the ones you
>>>> shared.
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <ra...@pinterest.com>
>>>> wrote:
>>>>
>>>>> Thank you Mattias.
>>>>> It’s version1.9.
>>>>>
>>>>> Best regards
>>>>> Rainie
>>>>>
>>>>> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <ma...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Rainie,
>>>>>> the network buffer pool was destroyed for some reason. This happens
>>>>>> when the NettyShuffleEnvironment gets closed which is triggered when an
>>>>>> operator is cleaned up, for instance. Maybe, the timeout in the metric
>>>>>> system caused this. But I'm not sure how this is connected. I'm gonna add
>>>>>> Chesnay to this conversation hoping that he can give more insights.
>>>>>>
>>>>>> If I may ask: What Flink version are you using?
>>>>>>
>>>>>> Thanks,
>>>>>> Matthias
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <ra...@pinterest.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> Our flink application kept restarting and it did lots of RPC calls
>>>>>>> to a dependency service.
>>>>>>>
>>>>>>> *We saw this exception from failed task manager log: *
>>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>>>> Could not forward element to next operator
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>>>>>>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>> ... 23 more
>>>>>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>>>>>> ... 32 more
>>>>>>>
>>>>>>> *We also saw this exception from Job manager log:*
>>>>>>> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting
>>>>>>>                        - Association to [akka.tcp://flink-metrics@host:38593]
>>>>>>> with UID [-1261564990] irrecoverably failed. Quarantining address.
>>>>>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>>>>>> for too long. (more than 48.0 hours)
>>>>>>> at
>>>>>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>> at
>>>>>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>> at
>>>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>> at
>>>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>> at
>>>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>
>>>>>>> This app has been running fine for a month.
>>>>>>> Any suggestion what could cause the issue? Any suggestions on how to
>>>>>>> debug it?
>>>>>>> Appreciated all advice.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best regards
>>>>>>> Rainie
>>>>>>>
>>>>>>

Re: Flink application kept restarting

Posted by Matthias Pohl <ma...@ververica.com>.
Hi Rainie,
in general buffer pools being destroyed usually mean that some other
exception occurred that caused the task to fail and in the process of
failure handling the operator-related network buffer is destroyed. That
causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
case. It looks like you had some timeout problem while fetching data from a
Kafka topic.

Matthias

On Tue, Mar 2, 2021 at 10:39 AM Rainie Li <ra...@pinterest.com> wrote:

> Thanks for checking, Matthias.
>
> I have another flink job which failed last weekend with the same buffer
> pool destroyed error. This job is also running version 1.9.
> Here is the error I found from the task manager log. Any suggestion what
> is the root cause and how to fix it?
>
> 2021-02-28 00:54:45,943 WARN
>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
> while canceling task.
> java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> --
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.runtime.io.network.partition.ProducerFailedException:
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:107)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:170)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:86)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
> ... 2 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout
> expired while fetching topic metadata
>
>
> Thanks again!
> Best regards
> Rainie
>
> On Mon, Mar 1, 2021 at 4:38 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> Another question is: The timeout of 48 hours sounds strange. There should
>> have been some other system noticing the connection problem earlier
>> assuming that you have a reasonably low heartbeat interval configured.
>>
>> Matthias
>>
>> On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <ma...@ververica.com>
>> wrote:
>>
>>> Thanks for providing this information, Rainie. Are other issues
>>> documented in the logs besides the TimeoutException in the JM logs which
>>> you already shared? For now, it looks like that there was a connection
>>> problem between the TaskManager and the JobManager that caused the shutdown
>>> of the operator resulting in the NetworkBufferPool to be destroyed. For
>>> this scenario I would expect other failures to occur besides the ones you
>>> shared.
>>>
>>> Best,
>>> Matthias
>>>
>>> On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <ra...@pinterest.com>
>>> wrote:
>>>
>>>> Thank you Mattias.
>>>> It’s version1.9.
>>>>
>>>> Best regards
>>>> Rainie
>>>>
>>>> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <ma...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Rainie,
>>>>> the network buffer pool was destroyed for some reason. This happens
>>>>> when the NettyShuffleEnvironment gets closed which is triggered when an
>>>>> operator is cleaned up, for instance. Maybe, the timeout in the metric
>>>>> system caused this. But I'm not sure how this is connected. I'm gonna add
>>>>> Chesnay to this conversation hoping that he can give more insights.
>>>>>
>>>>> If I may ask: What Flink version are you using?
>>>>>
>>>>> Thanks,
>>>>> Matthias
>>>>>
>>>>>
>>>>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <ra...@pinterest.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> Our flink application kept restarting and it did lots of RPC calls to
>>>>>> a dependency service.
>>>>>>
>>>>>> *We saw this exception from failed task manager log: *
>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>>> Could not forward element to next operator
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>> at
>>>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>>>>> at
>>>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>>>>>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>> at
>>>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
>>>>>> at
>>>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>> ... 23 more
>>>>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>>>>> ... 32 more
>>>>>>
>>>>>> *We also saw this exception from Job manager log:*
>>>>>> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting
>>>>>>                        - Association to [akka.tcp://flink-metrics@host:38593]
>>>>>> with UID [-1261564990] irrecoverably failed. Quarantining address.
>>>>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>>>>> for too long. (more than 48.0 hours)
>>>>>> at
>>>>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>> at
>>>>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>> at
>>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>> at
>>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>> at
>>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>
>>>>>> This app has been running fine for a month.
>>>>>> Any suggestion what could cause the issue? Any suggestions on how to
>>>>>> debug it?
>>>>>> Appreciated all advice.
>>>>>>
>>>>>> Thanks
>>>>>> Best regards
>>>>>> Rainie
>>>>>>
>>>>>

Re: Flink application kept restarting

Posted by Rainie Li <ra...@pinterest.com>.
Thanks for checking, Matthias.

I have another flink job which failed last weekend with the same buffer
pool destroyed error. This job is also running version 1.9.
Here is the error I found from the task manager log. Any suggestion what is
the root cause and how to fix it?

2021-02-28 00:54:45,943 WARN
 org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
while canceling task.
java.lang.RuntimeException: Buffer pool is destroyed.
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
--
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.io.network.partition.ProducerFailedException:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:107)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:170)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:86)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
... 2 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired
while fetching topic metadata


Thanks again!
Best regards
Rainie

On Mon, Mar 1, 2021 at 4:38 AM Matthias Pohl <ma...@ververica.com> wrote:

> Another question is: The timeout of 48 hours sounds strange. There should
> have been some other system noticing the connection problem earlier
> assuming that you have a reasonably low heartbeat interval configured.
>
> Matthias
>
> On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> Thanks for providing this information, Rainie. Are other issues
>> documented in the logs besides the TimeoutException in the JM logs which
>> you already shared? For now, it looks like that there was a connection
>> problem between the TaskManager and the JobManager that caused the shutdown
>> of the operator resulting in the NetworkBufferPool to be destroyed. For
>> this scenario I would expect other failures to occur besides the ones you
>> shared.
>>
>> Best,
>> Matthias
>>
>> On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <ra...@pinterest.com> wrote:
>>
>>> Thank you Mattias.
>>> It’s version1.9.
>>>
>>> Best regards
>>> Rainie
>>>
>>> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <ma...@ververica.com>
>>> wrote:
>>>
>>>> Hi Rainie,
>>>> the network buffer pool was destroyed for some reason. This happens
>>>> when the NettyShuffleEnvironment gets closed which is triggered when an
>>>> operator is cleaned up, for instance. Maybe, the timeout in the metric
>>>> system caused this. But I'm not sure how this is connected. I'm gonna add
>>>> Chesnay to this conversation hoping that he can give more insights.
>>>>
>>>> If I may ask: What Flink version are you using?
>>>>
>>>> Thanks,
>>>> Matthias
>>>>
>>>>
>>>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <ra...@pinterest.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Our flink application kept restarting and it did lots of RPC calls to
>>>>> a dependency service.
>>>>>
>>>>> *We saw this exception from failed task manager log: *
>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>> Could not forward element to next operator
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>> at
>>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>>>> at
>>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>>>>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>> at
>>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
>>>>> at
>>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>> ... 23 more
>>>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>>>> at
>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>>>> ... 32 more
>>>>>
>>>>> *We also saw this exception from Job manager log:*
>>>>> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting
>>>>>                      - Association to [akka.tcp://flink-metrics@host:38593]
>>>>> with UID [-1261564990] irrecoverably failed. Quarantining address.
>>>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>>>> for too long. (more than 48.0 hours)
>>>>> at
>>>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>> at
>>>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>
>>>>> This app has been running fine for a month.
>>>>> Any suggestion what could cause the issue? Any suggestions on how to
>>>>> debug it?
>>>>> Appreciated all advice.
>>>>>
>>>>> Thanks
>>>>> Best regards
>>>>> Rainie
>>>>>
>>>>

Re: Flink application kept restarting

Posted by Matthias Pohl <ma...@ververica.com>.
Another question is: The timeout of 48 hours sounds strange. There should
have been some other system noticing the connection problem earlier
assuming that you have a reasonably low heartbeat interval configured.

Matthias

On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <ma...@ververica.com> wrote:

> Thanks for providing this information, Rainie. Are other issues documented
> in the logs besides the TimeoutException in the JM logs which you already
> shared? For now, it looks like that there was a connection problem between
> the TaskManager and the JobManager that caused the shutdown of the operator
> resulting in the NetworkBufferPool to be destroyed. For this scenario I
> would expect other failures to occur besides the ones you shared.
>
> Best,
> Matthias
>
> On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <ra...@pinterest.com> wrote:
>
>> Thank you Mattias.
>> It’s version1.9.
>>
>> Best regards
>> Rainie
>>
>> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <ma...@ververica.com>
>> wrote:
>>
>>> Hi Rainie,
>>> the network buffer pool was destroyed for some reason. This happens when
>>> the NettyShuffleEnvironment gets closed which is triggered when an operator
>>> is cleaned up, for instance. Maybe, the timeout in the metric system caused
>>> this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
>>> conversation hoping that he can give more insights.
>>>
>>> If I may ask: What Flink version are you using?
>>>
>>> Thanks,
>>> Matthias
>>>
>>>
>>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <ra...@pinterest.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Our flink application kept restarting and it did lots of RPC calls to a
>>>> dependency service.
>>>>
>>>> *We saw this exception from failed task manager log: *
>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>> Could not forward element to next operator
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>> at
>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>> at
>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>>> at
>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>>>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>>>> at
>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>> at
>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>> at
>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
>>>> at
>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
>>>> at
>>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>> ... 23 more
>>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>>> at
>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
>>>> at
>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
>>>> at
>>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>>> ... 32 more
>>>>
>>>> *We also saw this exception from Job manager log:*
>>>> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting
>>>>                      - Association to [akka.tcp://flink-metrics@host:38593]
>>>> with UID [-1261564990] irrecoverably failed. Quarantining address.
>>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>>> for too long. (more than 48.0 hours)
>>>> at
>>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>> at
>>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> This app has been running fine for a month.
>>>> Any suggestion what could cause the issue? Any suggestions on how to
>>>> debug it?
>>>> Appreciated all advice.
>>>>
>>>> Thanks
>>>> Best regards
>>>> Rainie
>>>>
>>>