You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Le Xu <sh...@gmail.com> on 2023/03/22 05:20:06 UTC

Job hanging taking savepoint on legacy Flink

Hello!

I would like to run a legacy flink project on top of old-version Flink
(1.4.1) and I'm getting error when trying to cancel a job with savepoint.
Specifically, it reports the following error on requestBuffer:

My understanding would be that the save point operation probably requires
all outstanding messages to be processed, which somehow requires larger
buffer space (not entirely sure about this). However, it seems that my job
has no problem processing regular messages as long as I'm not cancelling it
with savepoint. And I have reduced the "web.backpressure.refresh-interval"
to 100 to force it to check back pressure frequently, but it still leads to
this error.

I am aware that I'd probably get more configuration knobs by running a
newer version of Flink but this particular version has some particular
modified functionalities I want to try. Any suggestions?


2023-03-21 23:04:59,718 WARN org.apache.flink.runtime.taskmanager.Task -
Task 'Source: bid-source -> Filter -> Flat Map -> flatmap-timestamp (10/32)'
did not react to cancelling signal, but is stuck in method:
java.lang.Object.wait(Native Method)
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(
LocalBufferPool.java:222)
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking
(LocalBufferPool.java:191)
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(
RecordWriter.java:146)
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
RecordWriter.java:92)
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(
StreamRecordWriter.java:84)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(
RecordWriterOutput.java:106)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
RecordWriterOutput.java:88)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
RecordWriterOutput.java:43)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement
(TimestampsAndPunctuatedWatermarksOperator.java:52)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:527)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:507)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
TimestampedCollector.java:51)
ch.ethz.systems.strymon.ds2.flink.nexmark.queries.KeyedHighestBidCount$2
.flatMap(KeyedHighestBidCount.java:245)
ch.ethz.systems.strymon.ds2.flink.nexmark.queries.KeyedHighestBidCount$2
.flatMap(KeyedHighestBidCount.java:173)
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(
StreamFlatMap.java:50)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:527)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:507)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.api.operators.StreamFilter.processElement(
StreamFilter.java:40)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:527)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:507)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.api.operators.StreamSourceContexts$
ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
org.apache.flink.streaming.api.operators.StreamSourceContexts$
WatermarkContext.collect(StreamSourceContexts.java:394)
source.NexmarkDynamicBatchSourceFunction.run(
NexmarkDynamicBatchSourceFunction.java:403)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:
86)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:
55)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
SourceStreamTask.java:94)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:
265)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
java.lang.Thread.run(Thread.java:748)

Re: Job hanging taking savepoint on legacy Flink

Posted by Shammon FY <zj...@gmail.com>.
Hi Le

The problem looks like there's not enough memory segment in the task
manager where the source wants to emit result data.

You can check the usage of network buffer pool in your webui and try to
increase the size of network buffer pool if it is not enough. You can see
more information about network buffer in doc [1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/

Best,
Shammon FY


On Wed, Mar 22, 2023 at 1:20 PM Le Xu <sh...@gmail.com> wrote:

> Hello!
>
> I would like to run a legacy flink project on top of old-version Flink
> (1.4.1) and I'm getting error when trying to cancel a job with savepoint.
> Specifically, it reports the following error on requestBuffer:
>
> My understanding would be that the save point operation probably requires
> all outstanding messages to be processed, which somehow requires larger
> buffer space (not entirely sure about this). However, it seems that my job
> has no problem processing regular messages as long as I'm not cancelling it
> with savepoint. And I have reduced the "web.backpressure.refresh-interval"
> to 100 to force it to check back pressure frequently, but it still leads to
> this error.
>
> I am aware that I'd probably get more configuration knobs by running a
> newer version of Flink but this particular version has some particular
> modified functionalities I want to try. Any suggestions?
>
>
> 2023-03-21 23:04:59,718 WARN org.apache.flink.runtime.taskmanager.Task -
> Task 'Source: bid-source -> Filter -> Flat Map -> flatmap-timestamp
> (10/32)' did not react to cancelling signal, but is stuck in method:
> java.lang.Object.wait(Native Method)
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(
> LocalBufferPool.java:222)
>
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking
> (LocalBufferPool.java:191)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(
> RecordWriter.java:146)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:92)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(
> StreamRecordWriter.java:84)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter
> (RecordWriterOutput.java:106)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> RecordWriterOutput.java:88)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> RecordWriterOutput.java:43)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
>
> org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement
> (TimestampsAndPunctuatedWatermarksOperator.java:52)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:527)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:507)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ch.ethz.systems.strymon.ds2.flink.nexmark.queries.KeyedHighestBidCount$2
> .flatMap(KeyedHighestBidCount.java:245)
> ch.ethz.systems.strymon.ds2.flink.nexmark.queries.KeyedHighestBidCount$2
> .flatMap(KeyedHighestBidCount.java:173)
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(
> StreamFlatMap.java:50)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:527)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:507)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(
> StreamFilter.java:40)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:527)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:507)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
> org.apache.flink.streaming.api.operators.StreamSourceContexts$
> ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
> org.apache.flink.streaming.api.operators.StreamSourceContexts$
> WatermarkContext.collect(StreamSourceContexts.java:394)
> source.NexmarkDynamicBatchSourceFunction.run(
> NexmarkDynamicBatchSourceFunction.java:403)
> org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:86)
> org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:55)
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
> SourceStreamTask.java:94)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java
> :265)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> java.lang.Thread.run(Thread.java:748)
>
>