You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/03/20 10:49:30 UTC

[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/5727

     [FLINK-8964][tests] Port JobSubmissionFailsITCase to flip6 

    ## What is the purpose of the change
    
    Ports the `JobSubmissionFailsITCase` to use `MiniClusterResource`.
    
    ## Brief change log
    
    * add utility method to `ExceptionUtils` for finding a `Throwable` matching a `Predicate`
    * remove `testSubmitNullJobGraph`; the test wasn't testing any runtime code
    * replace static working JobGraph with a factory method, as submitting the same JobGraph twice doesn't work on Flip6
    * introduce `MiniClusterResource`
    
    ## Verifying this change
    
    Run `JobSubmissionFailsITCase` with `flip6` profile enabled/disabled.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 8964

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5727.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5727
    
----

----


---

[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5727#discussion_r176487399
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java ---
    @@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) {
     
     	// --------------------------------------------------------------------------------------------
     
    -	private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
    -		if (detached) {
    -			cluster.submitJobDetached(jobGraph);
    -			return null;
    -		}
    -		else {
    -			return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
    -		}
    -	}
    -
     	@Test
    -	public void testExceptionInInitializeOnMaster() {
    -		try {
    -			final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
    -			failingJobVertex.setInvokableClass(NoOpInvokable.class);
    -
    -			final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
    +	public void testExceptionInInitializeOnMaster() throws Exception {
    +		final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
    +		failingJobVertex.setInvokableClass(NoOpInvokable.class);
     
    -			try {
    -				submitJob(failingJobGraph);
    -				fail("Expected JobExecutionException.");
    -			}
    -			catch (JobExecutionException e) {
    -				assertEquals("Test exception.", e.getCause().getMessage());
    -			}
    -			catch (Throwable t) {
    -				t.printStackTrace();
    -				fail("Caught wrong exception of type " + t.getClass() + ".");
    -			}
    +		final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
     
    -			cluster.submitJobAndWait(workingJobGraph, false);
    -		}
    -		catch (Exception e) {
    -			e.printStackTrace();
    -			fail(e.getMessage());
    -		}
    -	}
    +		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
    +		client.setDetached(detached);
     
    -	@Test
    -	public void testSubmitEmptyJobGraph() {
     		try {
    -			final JobGraph jobGraph = new JobGraph("Testing job");
    -
    -			try {
    -				submitJob(jobGraph);
    -				fail("Expected JobSubmissionException.");
    -			}
    -			catch (JobSubmissionException e) {
    -				assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
    +			client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader());
    +			fail("Job submission should have thrown an exception.");
    +		} catch (Exception e) {
    +			Optional<Throwable> expectedCause = ExceptionUtils.findThrowable(e,
    +				candidate -> candidate.getMessage() != null && candidate.getMessage().equals("Test exception."));
    +			if (!expectedCause.isPresent()) {
    +				throw e;
     			}
    -			catch (Throwable t) {
    -				t.printStackTrace();
    -				fail("Caught wrong exception of type " + t.getClass() + ".");
    -			}
    -
    -			cluster.submitJobAndWait(workingJobGraph, false);
    -		}
    -		catch (Exception e) {
    -			e.printStackTrace();
    -			fail(e.getMessage());
     		}
    +
    +		client.setDetached(false);
    +		client.submitJob(getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader());
    --- End diff --
    
    Why didn't it work to submit twice the same `JobGraph`?


---

[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5727#discussion_r176489375
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java ---
    @@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) {
     
     	// --------------------------------------------------------------------------------------------
     
    -	private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
    -		if (detached) {
    -			cluster.submitJobDetached(jobGraph);
    -			return null;
    -		}
    -		else {
    -			return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
    -		}
    -	}
    -
     	@Test
    -	public void testExceptionInInitializeOnMaster() {
    -		try {
    -			final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
    -			failingJobVertex.setInvokableClass(NoOpInvokable.class);
    -
    -			final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
    +	public void testExceptionInInitializeOnMaster() throws Exception {
    +		final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
    +		failingJobVertex.setInvokableClass(NoOpInvokable.class);
     
    -			try {
    -				submitJob(failingJobGraph);
    -				fail("Expected JobExecutionException.");
    -			}
    -			catch (JobExecutionException e) {
    -				assertEquals("Test exception.", e.getCause().getMessage());
    -			}
    -			catch (Throwable t) {
    -				t.printStackTrace();
    -				fail("Caught wrong exception of type " + t.getClass() + ".");
    -			}
    +		final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
     
    -			cluster.submitJobAndWait(workingJobGraph, false);
    -		}
    -		catch (Exception e) {
    -			e.printStackTrace();
    -			fail(e.getMessage());
    -		}
    -	}
    +		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
    +		client.setDetached(detached);
     
    -	@Test
    -	public void testSubmitEmptyJobGraph() {
     		try {
    -			final JobGraph jobGraph = new JobGraph("Testing job");
    -
    -			try {
    -				submitJob(jobGraph);
    -				fail("Expected JobSubmissionException.");
    -			}
    -			catch (JobSubmissionException e) {
    -				assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
    +			client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader());
    +			fail("Job submission should have thrown an exception.");
    +		} catch (Exception e) {
    +			Optional<Throwable> expectedCause = ExceptionUtils.findThrowable(e,
    +				candidate -> candidate.getMessage() != null && candidate.getMessage().equals("Test exception."));
    +			if (!expectedCause.isPresent()) {
    +				throw e;
     			}
    -			catch (Throwable t) {
    -				t.printStackTrace();
    -				fail("Caught wrong exception of type " + t.getClass() + ".");
    -			}
    -
    -			cluster.submitJobAndWait(workingJobGraph, false);
    -		}
    -		catch (Exception e) {
    -			e.printStackTrace();
    -			fail(e.getMessage());
     		}
    +
    +		client.setDetached(false);
    +		client.submitJob(getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader());
    --- End diff --
    
    Alright, it's because of the `RunningJobsRegistry` which records that a previous job with the same `JobID` has already been executed.


---

[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5727#discussion_r176487184
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java ---
    @@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) {
     
     	// --------------------------------------------------------------------------------------------
     
    -	private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
    -		if (detached) {
    -			cluster.submitJobDetached(jobGraph);
    -			return null;
    -		}
    -		else {
    -			return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
    -		}
    -	}
    -
     	@Test
    -	public void testExceptionInInitializeOnMaster() {
    -		try {
    -			final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
    -			failingJobVertex.setInvokableClass(NoOpInvokable.class);
    -
    -			final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
    +	public void testExceptionInInitializeOnMaster() throws Exception {
    +		final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
    +		failingJobVertex.setInvokableClass(NoOpInvokable.class);
     
    -			try {
    -				submitJob(failingJobGraph);
    -				fail("Expected JobExecutionException.");
    -			}
    -			catch (JobExecutionException e) {
    -				assertEquals("Test exception.", e.getCause().getMessage());
    -			}
    -			catch (Throwable t) {
    -				t.printStackTrace();
    -				fail("Caught wrong exception of type " + t.getClass() + ".");
    -			}
    +		final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
     
    -			cluster.submitJobAndWait(workingJobGraph, false);
    -		}
    -		catch (Exception e) {
    -			e.printStackTrace();
    -			fail(e.getMessage());
    -		}
    -	}
    +		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
    +		client.setDetached(detached);
     
    -	@Test
    -	public void testSubmitEmptyJobGraph() {
     		try {
    -			final JobGraph jobGraph = new JobGraph("Testing job");
    -
    -			try {
    -				submitJob(jobGraph);
    -				fail("Expected JobSubmissionException.");
    -			}
    -			catch (JobSubmissionException e) {
    -				assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
    +			client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader());
    +			fail("Job submission should have thrown an exception.");
    +		} catch (Exception e) {
    +			Optional<Throwable> expectedCause = ExceptionUtils.findThrowable(e,
    +				candidate -> candidate.getMessage() != null && candidate.getMessage().equals("Test exception."));
    --- End diff --
    
    could be simplified by `"Test exception.".equals(candidate.getMessage)`


---

[GitHub] flink issue #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase to flip...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5727
  
    merging.


---

[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5727


---