You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/05/24 08:12:23 UTC

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/6067

    [FLINK-9427] Fix registration and request slot race condition in TaskExecutor

    ## What is the purpose of the change
    
    This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
    it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
    registration was completed. Due to this, the TaskExecutor did not have all information it needed
    to accept task submissions.
    
    The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
    he SlotManager could already assign these slots to pending slot requests. With this commit, the
    registration protocol changes such that the TaskExecutor first registers at the ResourceManager
    and only after completing this step, it will announce the available slots to the SlotManager.
    
    cc @GJL 
    
    ## Brief change log
    
    - Changed the `TaskExecutor` `ResourceManager` registration protocol to announce the available slots after the completion of the registration
    - Hardened the `TaskExecutor#requestSlot` to only accept the call if there is an established connection to a `ResourceManager`
    
    ## Verifying this change
    
    - Added `SlotManagerTest#testSlotRequestFailure`
    - Added `TaskExecutorTest#testIgnoringSlotRequestsIfNotRegistered`, `testReconnectionAttemptIfExplicitlyDisconnected`, `testInitialSlotReport` and `testInitialSlotReportFailure`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (n)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixTaskExecutorRegistration

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6067.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6067
    
----
commit 4b4d82cc5a3bb9694fd19a37c21345cbe7928962
Author: Till Rohrmann <tr...@...>
Date:   2018-05-23T16:50:27Z

    [FLINK-9427] Fix registration and request slot race condition in TaskExecutor
    
    This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
    it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
    registration was completed. Due to this, the TaskExecutor did not have all information it needed
    to accept task submissions.
    
    The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
    he SlotManager could already assign these slots to pending slot requests. With this commit, the
    registration protocol changes such that the TaskExecutor first registers at the ResourceManager
    and only after completing this step, it will announce the available slots to the SlotManager.

----


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6067


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190603365
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---
    @@ -1483,6 +1485,216 @@ public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
     		}
     	}
     
    +	/**
    +	 * Tests that we ignore slot requests if the TaskExecutor is not
    +	 * registered at a ResourceManager.
    +	 */
    +	@Test
    +	public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
    +
    +		final TaskExecutor taskExecutor = createTaskExecutor(taskManagerServices);
    +
    +		taskExecutor.start();
    +
    +		try {
    +			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
    +
    +			final CompletableFuture<RegistrationResponse> registrationFuture = new CompletableFuture<>();
    +			final CompletableFuture<ResourceID> taskExecutorResourceIdFuture = new CompletableFuture<>();
    +
    +			testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
    +                taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    +                return registrationFuture;
    +            });
    +
    +			rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
    +			resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
    +
    +			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    +
    +			final ResourceID resourceId = taskExecutorResourceIdFuture.get();
    +
    +			final SlotID slotId = new SlotID(resourceId, 0);
    +			final CompletableFuture<Acknowledge> slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), timeout);
    +
    +			try {
    +				slotRequestResponse.get();
    +				fail("We should not be able to request slots before the TaskExecutor is registered at the ResourceManager.");
    +			} catch (ExecutionException ee) {
    +				assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(TaskManagerException.class));
    +			}
    +		} finally {
    +			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
    +		}
    +	}
    +
    +	/**
    +	 * Tests that the TaskExecutor tries to reconnect to a ResourceManager from which it
    +	 * was explicitly disconnected.
    +	 */
    +	@Test
    +	public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
    +		final long heartbeatInterval = 1000L;
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
    +		final TaskExecutor taskExecutor = new TaskExecutor(
    +			rpc,
    +			TaskManagerConfiguration.fromConfiguration(configuration),
    +			haServices,
    +			new TaskManagerServicesBuilder()
    +				.setTaskSlotTable(taskSlotTable)
    +				.setTaskManagerLocation(taskManagerLocation)
    +				.build(),
    +			new HeartbeatServices(heartbeatInterval, 1000L),
    +			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
    +			dummyBlobCacheService,
    +			testingFatalErrorHandler);
    +
    +		taskExecutor.start();
    +
    +		try {
    +			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
    +			final ClusterInformation clusterInformation = new ClusterInformation("foobar", 1234);
    +			final CompletableFuture<RegistrationResponse> registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), heartbeatInterval, clusterInformation));
    +			final BlockingQueue<ResourceID> registrationQueue = new ArrayBlockingQueue<>(1);
    +
    +			testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
    +                registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    --- End diff --
    
    Same here: spaces are used for indentation.


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190611399
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---
    @@ -1483,6 +1485,216 @@ public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
     		}
     	}
     
    +	/**
    +	 * Tests that we ignore slot requests if the TaskExecutor is not
    +	 * registered at a ResourceManager.
    +	 */
    +	@Test
    +	public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
    +
    +		final TaskExecutor taskExecutor = createTaskExecutor(taskManagerServices);
    +
    +		taskExecutor.start();
    +
    +		try {
    +			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
    +
    +			final CompletableFuture<RegistrationResponse> registrationFuture = new CompletableFuture<>();
    +			final CompletableFuture<ResourceID> taskExecutorResourceIdFuture = new CompletableFuture<>();
    +
    +			testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
    +                taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    +                return registrationFuture;
    +            });
    +
    +			rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
    +			resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
    +
    +			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    +
    +			final ResourceID resourceId = taskExecutorResourceIdFuture.get();
    +
    +			final SlotID slotId = new SlotID(resourceId, 0);
    +			final CompletableFuture<Acknowledge> slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), timeout);
    +
    +			try {
    +				slotRequestResponse.get();
    +				fail("We should not be able to request slots before the TaskExecutor is registered at the ResourceManager.");
    +			} catch (ExecutionException ee) {
    +				assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(TaskManagerException.class));
    +			}
    +		} finally {
    +			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
    +		}
    +	}
    +
    +	/**
    +	 * Tests that the TaskExecutor tries to reconnect to a ResourceManager from which it
    +	 * was explicitly disconnected.
    +	 */
    +	@Test
    +	public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
    +		final long heartbeatInterval = 1000L;
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
    +		final TaskExecutor taskExecutor = new TaskExecutor(
    +			rpc,
    +			TaskManagerConfiguration.fromConfiguration(configuration),
    +			haServices,
    +			new TaskManagerServicesBuilder()
    +				.setTaskSlotTable(taskSlotTable)
    +				.setTaskManagerLocation(taskManagerLocation)
    +				.build(),
    +			new HeartbeatServices(heartbeatInterval, 1000L),
    +			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
    +			dummyBlobCacheService,
    +			testingFatalErrorHandler);
    +
    +		taskExecutor.start();
    +
    +		try {
    +			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
    +			final ClusterInformation clusterInformation = new ClusterInformation("foobar", 1234);
    +			final CompletableFuture<RegistrationResponse> registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), heartbeatInterval, clusterInformation));
    +			final BlockingQueue<ResourceID> registrationQueue = new ArrayBlockingQueue<>(1);
    +
    +			testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
    +                registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    +                return registrationResponseFuture;
    +            });
    +			rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
    +
    +			resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
    +
    +			final ResourceID firstRegistrationAttempt = registrationQueue.take();
    +
    +			assertThat(firstRegistrationAttempt, equalTo(taskManagerLocation.getResourceID()));
    +
    +			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    +
    +			assertThat(registrationQueue.isEmpty(), is(true));
    --- End diff --
    
    good point. Will change it.


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190557380
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -732,19 +744,10 @@ public void heartbeatFromResourceManager(ResourceID resourceID) {
     			allocationId, jobId, resourceManagerId);
     
     		try {
    -			if (resourceManagerConnection == null) {
    -				final String message = "TaskManager is not connected to a resource manager.";
    +			if (!isConnectedToResourceManager(resourceManagerId)) {
    +				final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
     				log.debug(message);
    -				throw new SlotAllocationException(message);
    -			}
    -
    -			if (!Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
    -				final String message = "The leader id " + resourceManagerId +
    -					" does not match with the leader id of the connected resource manager " +
    -					resourceManagerConnection.getTargetLeaderId() + '.';
    -
    -				log.debug(message);
    -				throw new SlotAllocationException(message);
    +				throw new TaskManagerException(message);
    --- End diff --
    
    Why not return an exceptional future here?


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190603900
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -732,19 +744,10 @@ public void heartbeatFromResourceManager(ResourceID resourceID) {
     			allocationId, jobId, resourceManagerId);
     
     		try {
    -			if (resourceManagerConnection == null) {
    -				final String message = "TaskManager is not connected to a resource manager.";
    +			if (!isConnectedToResourceManager(resourceManagerId)) {
    +				final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
     				log.debug(message);
    -				throw new SlotAllocationException(message);
    -			}
    -
    -			if (!Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
    -				final String message = "The leader id " + resourceManagerId +
    -					" does not match with the leader id of the connected resource manager " +
    -					resourceManagerConnection.getTargetLeaderId() + '.';
    -
    -				log.debug(message);
    -				throw new SlotAllocationException(message);
    +				throw new TaskManagerException(message);
    --- End diff --
    
    We already talked about it offline.


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190603934
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -933,25 +956,35 @@ public void requestHeartbeat(ResourceID resourceID, SlotReport slotReport) {
     
     		blobCacheService.setBlobServerAddress(blobServerAddress);
     
    +		establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
    +			resourceManagerGateway,
    +			resourceManagerResourceId,
    +			taskExecutorRegistrationId);
    +
     		stopRegistrationTimeout();
     	}
     
     	private void closeResourceManagerConnection(Exception cause) {
    -		if (resourceManagerConnection != null) {
    -
    -			if (resourceManagerConnection.isConnected()) {
    -				if (log.isDebugEnabled()) {
    -					log.debug("Close ResourceManager connection {}.",
    -						resourceManagerConnection.getResourceManagerId(), cause);
    -				} else {
    -					log.info("Close ResourceManager connection {}.",
    -						resourceManagerConnection.getResourceManagerId());
    -				}
    -				resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
    +		if (establishedResourceManagerConnection != null) {
    +			final ResourceID resourceManagerResourceId = establishedResourceManagerConnection.getResourceManagerResourceId();
     
    -				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
    -				resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
    +			if (log.isDebugEnabled()) {
    +				log.debug("Close ResourceManager connection {}.",
    --- End diff --
    
    We already talked about it offline.


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190602158
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---
    @@ -1483,6 +1485,216 @@ public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
     		}
     	}
     
    +	/**
    +	 * Tests that we ignore slot requests if the TaskExecutor is not
    +	 * registered at a ResourceManager.
    +	 */
    +	@Test
    +	public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
    +
    +		final TaskExecutor taskExecutor = createTaskExecutor(taskManagerServices);
    +
    +		taskExecutor.start();
    +
    +		try {
    +			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
    +
    +			final CompletableFuture<RegistrationResponse> registrationFuture = new CompletableFuture<>();
    +			final CompletableFuture<ResourceID> taskExecutorResourceIdFuture = new CompletableFuture<>();
    +
    +			testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
    +                taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    +                return registrationFuture;
    +            });
    +
    +			rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
    +			resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
    +
    +			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    +
    +			final ResourceID resourceId = taskExecutorResourceIdFuture.get();
    +
    +			final SlotID slotId = new SlotID(resourceId, 0);
    +			final CompletableFuture<Acknowledge> slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), timeout);
    +
    +			try {
    +				slotRequestResponse.get();
    +				fail("We should not be able to request slots before the TaskExecutor is registered at the ResourceManager.");
    +			} catch (ExecutionException ee) {
    +				assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(TaskManagerException.class));
    +			}
    +		} finally {
    +			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
    +		}
    +	}
    +
    +	/**
    +	 * Tests that the TaskExecutor tries to reconnect to a ResourceManager from which it
    +	 * was explicitly disconnected.
    +	 */
    +	@Test
    +	public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
    +		final long heartbeatInterval = 1000L;
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
    +		final TaskExecutor taskExecutor = new TaskExecutor(
    +			rpc,
    +			TaskManagerConfiguration.fromConfiguration(configuration),
    +			haServices,
    +			new TaskManagerServicesBuilder()
    +				.setTaskSlotTable(taskSlotTable)
    +				.setTaskManagerLocation(taskManagerLocation)
    +				.build(),
    +			new HeartbeatServices(heartbeatInterval, 1000L),
    +			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
    +			dummyBlobCacheService,
    +			testingFatalErrorHandler);
    +
    +		taskExecutor.start();
    +
    +		try {
    +			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
    +			final ClusterInformation clusterInformation = new ClusterInformation("foobar", 1234);
    +			final CompletableFuture<RegistrationResponse> registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), heartbeatInterval, clusterInformation));
    +			final BlockingQueue<ResourceID> registrationQueue = new ArrayBlockingQueue<>(1);
    +
    +			testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
    +                registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    +                return registrationResponseFuture;
    +            });
    +			rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
    +
    +			resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
    +
    +			final ResourceID firstRegistrationAttempt = registrationQueue.take();
    +
    +			assertThat(firstRegistrationAttempt, equalTo(taskManagerLocation.getResourceID()));
    +
    +			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    +
    +			assertThat(registrationQueue.isEmpty(), is(true));
    --- End diff --
    
    nit: `assertThat(registrationQueue, is(empty()));` gives better failure messages. Now it's just a fancy way of writing `assertTrue()`.


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190611660
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---
    @@ -1483,6 +1485,216 @@ public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
     		}
     	}
     
    +	/**
    +	 * Tests that we ignore slot requests if the TaskExecutor is not
    +	 * registered at a ResourceManager.
    +	 */
    +	@Test
    +	public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
    +
    +		final TaskExecutor taskExecutor = createTaskExecutor(taskManagerServices);
    +
    +		taskExecutor.start();
    +
    +		try {
    +			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
    +
    +			final CompletableFuture<RegistrationResponse> registrationFuture = new CompletableFuture<>();
    +			final CompletableFuture<ResourceID> taskExecutorResourceIdFuture = new CompletableFuture<>();
    +
    +			testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
    +                taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    +                return registrationFuture;
    +            });
    +
    +			rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
    +			resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
    +
    +			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    +
    +			final ResourceID resourceId = taskExecutorResourceIdFuture.get();
    +
    +			final SlotID slotId = new SlotID(resourceId, 0);
    +			final CompletableFuture<Acknowledge> slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), timeout);
    +
    +			try {
    +				slotRequestResponse.get();
    +				fail("We should not be able to request slots before the TaskExecutor is registered at the ResourceManager.");
    +			} catch (ExecutionException ee) {
    +				assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(TaskManagerException.class));
    +			}
    +		} finally {
    +			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
    +		}
    +	}
    +
    +	/**
    +	 * Tests that the TaskExecutor tries to reconnect to a ResourceManager from which it
    +	 * was explicitly disconnected.
    +	 */
    +	@Test
    +	public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
    +		final long heartbeatInterval = 1000L;
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
    +		final TaskExecutor taskExecutor = new TaskExecutor(
    +			rpc,
    +			TaskManagerConfiguration.fromConfiguration(configuration),
    +			haServices,
    +			new TaskManagerServicesBuilder()
    +				.setTaskSlotTable(taskSlotTable)
    +				.setTaskManagerLocation(taskManagerLocation)
    +				.build(),
    +			new HeartbeatServices(heartbeatInterval, 1000L),
    +			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
    +			dummyBlobCacheService,
    +			testingFatalErrorHandler);
    +
    +		taskExecutor.start();
    +
    +		try {
    +			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
    +			final ClusterInformation clusterInformation = new ClusterInformation("foobar", 1234);
    +			final CompletableFuture<RegistrationResponse> registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), heartbeatInterval, clusterInformation));
    +			final BlockingQueue<ResourceID> registrationQueue = new ArrayBlockingQueue<>(1);
    +
    +			testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
    +                registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    --- End diff --
    
    Arrg, the same.


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190611581
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---
    @@ -1139,6 +1147,61 @@ public void testReportAllocatedSlot() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Testst that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call
    +	 * fails.
    +	 */
    +	@Test
    +	public void testSlotRequestFailure() throws Exception {
    +		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions())) {
    +
    +			final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
    +			slotManager.registerSlotRequest(slotRequest);
    +
    +			final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
    +			final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
    +
    +			final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
    +				.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
    +                    requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
    --- End diff --
    
    Arrg, spaces.... How did they make it into this PR? Will get rid of them.


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190583266
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -933,25 +956,35 @@ public void requestHeartbeat(ResourceID resourceID, SlotReport slotReport) {
     
     		blobCacheService.setBlobServerAddress(blobServerAddress);
     
    +		establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
    +			resourceManagerGateway,
    +			resourceManagerResourceId,
    +			taskExecutorRegistrationId);
    +
     		stopRegistrationTimeout();
     	}
     
     	private void closeResourceManagerConnection(Exception cause) {
    -		if (resourceManagerConnection != null) {
    -
    -			if (resourceManagerConnection.isConnected()) {
    -				if (log.isDebugEnabled()) {
    -					log.debug("Close ResourceManager connection {}.",
    -						resourceManagerConnection.getResourceManagerId(), cause);
    -				} else {
    -					log.info("Close ResourceManager connection {}.",
    -						resourceManagerConnection.getResourceManagerId());
    -				}
    -				resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
    +		if (establishedResourceManagerConnection != null) {
    +			final ResourceID resourceManagerResourceId = establishedResourceManagerConnection.getResourceManagerResourceId();
     
    -				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
    -				resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
    +			if (log.isDebugEnabled()) {
    +				log.debug("Close ResourceManager connection {}.",
    --- End diff --
    
    I was wondering whether `cause` can get logged twice:
    
    1. 
    ```
    log.debug("Close ResourceManager connection {}.",
    					resourceManagerResourceId, cause);
    ```
    2.
    ``` 	
    log.debug("Terminating registration attempts towards ResourceManager {}.",
    						resourceManagerConnection.getTargetAddress(), cause);
    ```


---

[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190602807
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---
    @@ -1139,6 +1147,61 @@ public void testReportAllocatedSlot() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Testst that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call
    +	 * fails.
    +	 */
    +	@Test
    +	public void testSlotRequestFailure() throws Exception {
    +		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions())) {
    +
    +			final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
    +			slotManager.registerSlotRequest(slotRequest);
    +
    +			final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
    +			final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
    +
    +			final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
    +				.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
    +                    requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
    --- End diff --
    
    I think you are using spaces for indentation here.


---