You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2018/11/07 10:39:00 UTC

[jira] [Commented] (FLINK-10721) kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method

    [ https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678011#comment-16678011 ] 

Chesnay Schepler commented on FLINK-10721:
------------------------------------------

I would instead suggest to modify the `DiscoveryLoopThread` to catch and suppress exceptions in the cancel call.
{code}
public void run() {
	try {
		...
	} catch (Exception e) {
		discoveryLoopErrorRef.set(e);
	} finally {
		// calling cancel will also let the fetcher loop escape
		// (if not running, cancel() was already called)
		if (running) {
			// <----- catch and suppress exceptions here ----->
			cancel();
		}
	}
}
{code}

> kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method 
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-10721
>                 URL: https://issues.apache.org/jira/browse/FLINK-10721
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.6.2
>            Reporter: zhaoshijie
>            Priority: Major
>             Fix For: 1.6.3, 1.7.0
>
>
> In FlinkKafkaConsumerBase run method on line 721(master branch), if kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw exception then finally execute cancel method, cancel method will execute kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute handover.close, then result in handover.pollNext throw ClosedException),then next code will not execute,especially discoveryLoopError not be throwed,so, real culprit exception will be Swallowed.
> failed log like this:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
>   at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>   at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
>   at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
>   at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
>   at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Shoud we modify it as follows?
> {code:java}
> try {
> 				kafkaFetcher.runFetchLoop();
> 			} catch (Exception e) {
> 				// if discoveryLoopErrorRef not null ,we should throw real culprit exception
> 				if (discoveryLoopErrorRef.get() != null){
> 					throw new RuntimeException(discoveryLoopErrorRef.get());
> 				} else {
> 					throw e;
> 				}
> 			}
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)