You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Gary Yao (Jira)" <ji...@apache.org> on 2019/11/14 16:42:00 UTC

[jira] [Closed] (FLINK-14680) Enable KafkaConsumerTestBase#runFailOnNoBrokerTest to pass with new DefaultScheduler

     [ https://issues.apache.org/jira/browse/FLINK-14680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gary Yao closed FLINK-14680.
----------------------------
    Resolution: Fixed

master: 2b5b792c21e33cc0d0c8c76846e6908984224663

> Enable KafkaConsumerTestBase#runFailOnNoBrokerTest to pass with new DefaultScheduler
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-14680
>                 URL: https://issues.apache.org/jira/browse/FLINK-14680
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination, Tests
>    Affects Versions: 1.10.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.10.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> {{KafkaConsumerTestBase#runFailOnNoBrokerTest}} has assumptions on the causal chain of the {{JobExecutionException}}. In particular, it assumes that the exception caused by user code is the direct cause of {{JobExecutionException}}. However, this is no longer true when using the {{DefaultScheduler}}, which wraps the exception in an {{JobException}}, which additionally specifies the reason of the job recovery suppression.
> The  code in question is listed below:
> {code:java}
> 		} catch (JobExecutionException jee) {
> 			if (kafkaServer.getVersion().equals("0.9") ||
> 				kafkaServer.getVersion().equals("0.10") ||
> 				kafkaServer.getVersion().equals("0.11") ||
> 				kafkaServer.getVersion().equals("2.0")) {
> 				assertTrue(jee.getCause() instanceof TimeoutException);
> 				TimeoutException te = (TimeoutException) jee.getCause();
> 				assertEquals("Timeout expired while fetching topic metadata", te.getMessage());
> 			} else {
> 				assertTrue(jee.getCause() instanceof RuntimeException);
> 				RuntimeException re = (RuntimeException) jee.getCause();
> 				assertTrue(re.getMessage().contains("Unable to retrieve any partitions"));
> 			}
> 		}
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)