You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Soheil Pourbafrani <so...@gmail.com> on 2020/02/14 22:41:09 UTC

TaskManager Fail when I cancel the job and crash

Hi,

I developed a single Flink job that read a huge amount of files and after
some simple preprocessing, sink them into the database. I use the built-in
JDBCOutputFormat for inserting records into the database. The problem is
when I cancel the job using either the WebUI or the command line, the job
did not cancel completely and finally, the taskmanager process crashes!
Here are the taskmanager logs (generated continuously for some seconds):

2020-02-15 01:17:17,208 WARN
>  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
>  - The reader is stuck in method:
>  java.lang.Object.wait(Native Method)
> org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)
> org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:856)
>
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
>
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
>
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
>
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
>
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
>
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
> 2020-02-15 01:17:17,224 INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
> shut down.
> 2020-02-15 01:17:17,225 INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
> shut down.


I'm using the
Flink: 1.7.2,
java: Java(TM) SE Runtime Environment (build 1.8.0_91-b14)

Any help will be appreciated.

All the best,
Soheil

Re: TaskManager Fail when I cancel the job and crash

Posted by Zhu Zhu <re...@gmail.com>.
Hi Soheil,

I think the root cause is that in the cancellation, the task was stuck in

*org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)*


The taskmanager process exit is expected in this case to enforce a failure
and recovery.
To be specific, when a task on the TM is to be canceled, a
*TaskCancelerWatchDog* will be started to watch the cancellation.
If the cancellation timed out, the watchdog would trigger a fatal error to
force the TM to exit.

I think you may need to diagnostic why the postgresql call took so long to
flush data.
Alternatively, if the long flushing time cost is expected, one can increase
the cancellation timeout ("task.cancellation.timeout") to avoid this issue.

Thanks,
Zhu Zhu

Soheil Pourbafrani <so...@gmail.com> 于2020年2月15日周六 上午6:41写道:

> Hi,
>
> I developed a single Flink job that read a huge amount of files and after
> some simple preprocessing, sink them into the database. I use the built-in
> JDBCOutputFormat for inserting records into the database. The problem is
> when I cancel the job using either the WebUI or the command line, the job
> did not cancel completely and finally, the taskmanager process crashes!
> Here are the taskmanager logs (generated continuously for some seconds):
>
> 2020-02-15 01:17:17,208 WARN
>>  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
>>  - The reader is stuck in method:
>>  java.lang.Object.wait(Native Method)
>> org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)
>> org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:856)
>>
>> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
>>
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
>>
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
>>
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
>>
>> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
>>
>> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>>
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>> 2020-02-15 01:17:17,224 INFO
>>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
>> shut down.
>> 2020-02-15 01:17:17,225 INFO
>>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
>> shut down.
>
>
> I'm using the
> Flink: 1.7.2,
> java: Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
>
> Any help will be appreciated.
>
> All the best,
> Soheil
>