You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/05/24 08:16:01 UTC

[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/6068

     [FLINK-9421] Remove job from RunningJobsRegistry when it reaches a terminal state

    ## What is the purpose of the change
    
    This commit lets the Dispatcher remove the RunningJobsRegistry entry for a completed job
    when it is removed from the Dispatcher.
    
    This PR is based on #6067 
    
    cc @GJL
    
    ## Verifying this change
    
    - Added `DispatcherResourceCleanupTest#testRunningJobsRegistryCleanup`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink runningJobsRegistryCleanup

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6068.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6068
    
----
commit 4b4d82cc5a3bb9694fd19a37c21345cbe7928962
Author: Till Rohrmann <tr...@...>
Date:   2018-05-23T16:50:27Z

    [FLINK-9427] Fix registration and request slot race condition in TaskExecutor
    
    This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
    it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
    registration was completed. Due to this, the TaskExecutor did not have all information it needed
    to accept task submissions.
    
    The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
    he SlotManager could already assign these slots to pending slot requests. With this commit, the
    registration protocol changes such that the TaskExecutor first registers at the ResourceManager
    and only after completing this step, it will announce the available slots to the SlotManager.

commit 4d034edca41294e250c49807a3beecb2b419824d
Author: Till Rohrmann <tr...@...>
Date:   2018-05-23T21:48:38Z

    [FLINK-9421] Remove job from RunningJobsRegistry when it reaches a terminal state
    
    This commit lets the Dispatcher remove the RunningJobsRegistry entry for a completed job
    when it is removed from the Dispatcher.

----


---

[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6068#discussion_r190571258
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java ---
    @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
     		assertThat(deleteAllFuture.isDone(), is(false));
     	}
     
    +	/**
    +	 * Tests that the {@link RunningJobsRegistry} entries are cleared after the
    +	 * job reached a terminal state.
    +	 */
    +	@Test
    +	public void testRunningJobsRegistryCleanup() throws Exception {
    +		submitJob();
    +
    +		runningJobsRegistry.setJobRunning(jobId);
    +		assertThat(runningJobsRegistry.contains(jobId), is(true));
    +
    +		resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
    +
    +		// wait for the clearing
    +		runningJobsRegistry.getClearedFuture().get();
    +
    +		assertThat(runningJobsRegistry.contains(jobId), is(false));
    +	}
    +
    +	private static final class TestingRunningJobsRegistry implements RunningJobsRegistry {
    --- End diff --
    
    Good idea.


---

[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6068#discussion_r190500268
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java ---
    @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
     		assertThat(deleteAllFuture.isDone(), is(false));
     	}
     
    +	/**
    +	 * Tests that the {@link RunningJobsRegistry} entries are cleared after the
    +	 * job reached a terminal state.
    +	 */
    +	@Test
    +	public void testRunningJobsRegistryCleanup() throws Exception {
    +		submitJob();
    +
    +		runningJobsRegistry.setJobRunning(jobId);
    +		assertThat(runningJobsRegistry.contains(jobId), is(true));
    +
    +		resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
    +
    +		// wait for the clearing
    +		runningJobsRegistry.getClearedFuture().get();
    +
    +		assertThat(runningJobsRegistry.contains(jobId), is(false));
    +	}
    +
    +	private static final class TestingRunningJobsRegistry implements RunningJobsRegistry {
    +
    +		private final JobID jobId;
    +
    +		private final CompletableFuture<Void> clearedFuture = new CompletableFuture<>();
    +
    +		private JobSchedulingStatus jobSchedulingStatus = JobSchedulingStatus.PENDING;
    +
    +		private boolean containsJob = false;
    +
    +		private TestingRunningJobsRegistry(JobID jobId) {
    +			this.jobId = jobId;
    +		}
    +
    +		public CompletableFuture<Void> getClearedFuture() {
    +			return clearedFuture;
    +		}
    +
    +		@Override
    +		public void setJobRunning(JobID jobID) throws IOException {
    +			checkJobId(jobID);
    +			containsJob = true;
    +			jobSchedulingStatus = JobSchedulingStatus.RUNNING;
    +		}
    +
    +		private void checkJobId(JobID jobID) {
    +			Preconditions.checkArgument(jobId.equals(jobID));
    --- End diff --
    
    Not the best variable names here: `jobId` vs. `jobID`


---

[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6068#discussion_r190571191
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java ---
    @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
     		assertThat(deleteAllFuture.isDone(), is(false));
     	}
     
    +	/**
    +	 * Tests that the {@link RunningJobsRegistry} entries are cleared after the
    +	 * job reached a terminal state.
    +	 */
    +	@Test
    +	public void testRunningJobsRegistryCleanup() throws Exception {
    +		submitJob();
    +
    +		runningJobsRegistry.setJobRunning(jobId);
    +		assertThat(runningJobsRegistry.contains(jobId), is(true));
    +
    +		resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
    +
    +		// wait for the clearing
    +		runningJobsRegistry.getClearedFuture().get();
    +
    +		assertThat(runningJobsRegistry.contains(jobId), is(false));
    +	}
    +
    +	private static final class TestingRunningJobsRegistry implements RunningJobsRegistry {
    +
    +		private final JobID jobId;
    +
    +		private final CompletableFuture<Void> clearedFuture = new CompletableFuture<>();
    +
    +		private JobSchedulingStatus jobSchedulingStatus = JobSchedulingStatus.PENDING;
    +
    +		private boolean containsJob = false;
    +
    +		private TestingRunningJobsRegistry(JobID jobId) {
    +			this.jobId = jobId;
    +		}
    +
    +		public CompletableFuture<Void> getClearedFuture() {
    +			return clearedFuture;
    +		}
    +
    +		@Override
    +		public void setJobRunning(JobID jobID) throws IOException {
    +			checkJobId(jobID);
    +			containsJob = true;
    +			jobSchedulingStatus = JobSchedulingStatus.RUNNING;
    +		}
    +
    +		private void checkJobId(JobID jobID) {
    +			Preconditions.checkArgument(jobId.equals(jobID));
    --- End diff --
    
    True, will change it.


---

[GitHub] flink issue #6068: [FLINK-9421] Remove job from RunningJobsRegistry when it ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/6068
  
    Thanks for the review @GJL. I'll address your comments and then merge the PR.


---

[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6068#discussion_r190571226
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java ---
    @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
     		assertThat(deleteAllFuture.isDone(), is(false));
     	}
     
    +	/**
    +	 * Tests that the {@link RunningJobsRegistry} entries are cleared after the
    +	 * job reached a terminal state.
    +	 */
    +	@Test
    +	public void testRunningJobsRegistryCleanup() throws Exception {
    +		submitJob();
    +
    +		runningJobsRegistry.setJobRunning(jobId);
    +		assertThat(runningJobsRegistry.contains(jobId), is(true));
    +
    +		resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
    +
    +		// wait for the clearing
    +		runningJobsRegistry.getClearedFuture().get();
    +
    +		assertThat(runningJobsRegistry.contains(jobId), is(false));
    +	}
    +
    +	private static final class TestingRunningJobsRegistry implements RunningJobsRegistry {
    +
    +		private final JobID jobId;
    +
    +		private final CompletableFuture<Void> clearedFuture = new CompletableFuture<>();
    --- End diff --
    
    Good point. Will change it.


---

[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6068#discussion_r190548607
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java ---
    @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
     		assertThat(deleteAllFuture.isDone(), is(false));
     	}
     
    +	/**
    +	 * Tests that the {@link RunningJobsRegistry} entries are cleared after the
    +	 * job reached a terminal state.
    +	 */
    +	@Test
    +	public void testRunningJobsRegistryCleanup() throws Exception {
    +		submitJob();
    +
    +		runningJobsRegistry.setJobRunning(jobId);
    +		assertThat(runningJobsRegistry.contains(jobId), is(true));
    +
    +		resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
    +
    +		// wait for the clearing
    +		runningJobsRegistry.getClearedFuture().get();
    +
    +		assertThat(runningJobsRegistry.contains(jobId), is(false));
    +	}
    +
    +	private static final class TestingRunningJobsRegistry implements RunningJobsRegistry {
    --- End diff --
    
    Maybe `SingleRunningJobRegistry`.


---

[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6068#discussion_r190548385
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java ---
    @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
     		assertThat(deleteAllFuture.isDone(), is(false));
     	}
     
    +	/**
    +	 * Tests that the {@link RunningJobsRegistry} entries are cleared after the
    +	 * job reached a terminal state.
    +	 */
    +	@Test
    +	public void testRunningJobsRegistryCleanup() throws Exception {
    +		submitJob();
    +
    +		runningJobsRegistry.setJobRunning(jobId);
    +		assertThat(runningJobsRegistry.contains(jobId), is(true));
    +
    +		resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
    +
    +		// wait for the clearing
    +		runningJobsRegistry.getClearedFuture().get();
    +
    +		assertThat(runningJobsRegistry.contains(jobId), is(false));
    +	}
    +
    +	private static final class TestingRunningJobsRegistry implements RunningJobsRegistry {
    +
    +		private final JobID jobId;
    +
    +		private final CompletableFuture<Void> clearedFuture = new CompletableFuture<>();
    --- End diff --
    
    For simplicity I would use a `CountDownLatch` here.


---

[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6068


---