You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by sohimankotia <so...@gmail.com> on 2017/11/21 06:52:43 UTC

How graceful shutdown or resource clean up happens in Flink at task level ?

Let's assume I have following class :

public class TestFlatMap extends RichFlatMapFunction<String,String> {

	private Connection connection ;

	@Override
	public void open(Configuration parameters) throws Exception {
		super.open(parameters);
		// Open Connection
	}

	@Override
	public void flatMap(String value, Collector<String> out) throws Exception {

		// Error while executing record


	}


	@Override
	public void close() throws Exception {
		super.close();
		// Close Connection
	}
}


In which cases close () will be called by flink to clean up resources  , if
there is erro in flatMap function ?

1. Some programmatic error (NullPointer Exception)
2. OutOfMemoryError
3. Syste.exit(0)


I just wanted to 

1. how flink will handle cleanup of resources (code written in close method
).?
2. How does it handle graceful shutdown at task level  ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How graceful shutdown or resource clean up happens in Flink at task level ?

Posted by sohimankotia <so...@gmail.com>.
Thanks Stefan .



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How graceful shutdown or resource clean up happens in Flink at task level ?

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,


the user function’s close() method is called in AbstractStreamOperator::close() and ::dispose(). The invocation of the user function’s close() in AbstractStreamOperator::dispose() only has an effect if there was no previous invocation of the method through AbstractStreamOperator::close(). 

AbstractStreamOperator::close() and ::dispose(), in turn, are called inside StreamTask::invoke(), which also runs the operator’s main processing loop (AbstractStreamOperator::run()). AbstractStreamOperator::close() happens through closeAllOperators(), after AbstractStreamOperator::run(). In case that run() is exited through a Throwable, we end up in a catch block for Throwable that invokes AbstractStreamOperator::dispose(). So the UDF is either closed normally, after the operator’s run method ended or exceptional through the operator’s dispose() method. 

For your 3 cases this means:

> 1. Some programmatic error (NullPointer Exception)

Will end up in the catch-block around the operator’s run() method and reach UDF’s close() via AbstractStreamOperator::dispose().

> 2. OutOfMemoryError

Same as for (1.), but there is no strict guarantee that we have enough heap memory to actually perform the close() call.

> 3. Syste.exit(0)

This terminates the JVM immediately, no further code will be executed and therefore no cleanup can happen.

Best,
Stefan