You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/06 05:45:51 UTC

[flink] branch master updated (57f1ba2 -> 6644321)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 57f1ba2  [FLINK-13584][table-planner-blink] RankLikeAggFunctionBase should take type into account when generate literal expression
     new d6dbf88  [hotfix][runtime] Fix checkstyles in SlotManagerImpl class.
     new 6644321  [FLINK-13555][runtime] Fail slot requests immediately at the SlotPool if unfulfillable

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/jobmaster/slotpool/SlotPoolImpl.java   | 11 ++++-
 .../runtime/resourcemanager/ResourceManager.java   |  3 +-
 .../UnfulfillableSlotRequestException.java}        | 18 ++++----
 .../resourcemanager/slotmanager/SlotManager.java   |  5 +-
 .../slotmanager/SlotManagerException.java          | 34 --------------
 .../slotmanager/SlotManagerImpl.java               | 19 ++++----
 .../slotpool/SlotPoolBatchSlotRequestTest.java     | 54 ++++++++++++++++++++--
 .../runtime/jobmaster/slotpool/SlotPoolUtils.java  |  4 +-
 .../SlotManagerFailUnfulfillableTest.java          | 23 +++++----
 .../slotmanager/SlotManagerTest.java               |  4 +-
 10 files changed, 101 insertions(+), 74 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{taskexecutor/slot/SlotNotActiveException.java => resourcemanager/exceptions/UnfulfillableSlotRequestException.java} (55%)
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java


[flink] 01/02: [hotfix][runtime] Fix checkstyles in SlotManagerImpl class.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d6dbf88bd8c68243ba4dcc31ffd6e7d55a34185d
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Aug 2 14:42:40 2019 +0200

    [hotfix][runtime] Fix checkstyles in SlotManagerImpl class.
---
 .../flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java    | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index 4e4124b..cd894e3 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -117,7 +117,6 @@ public class SlotManagerImpl implements SlotManager {
 
 	/**
 	 * If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request to pend.
-	 *
 	 * A slot request is considered unfulfillable if it cannot be fulfilled by neither a slot that is already registered
 	 * (including allocated ones) nor a pending slot that the {@link ResourceActions} can allocate.
 	 * */
@@ -211,8 +210,7 @@ public class SlotManagerImpl implements SlotManager {
 	 * @param newResourceActions to use for resource (de-)allocations
 	 */
 	@Override
-	public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor,
-					  ResourceActions newResourceActions) {
+	public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
 		LOG.info("Starting the SlotManager.");
 
 		this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);


[flink] 02/02: [FLINK-13555][runtime] Fail slot requests immediately at the SlotPool if unfulfillable

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6644321f8637d96bd1b74fa6b244bb2bf22d194d
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Aug 2 15:02:06 2019 +0200

    [FLINK-13555][runtime] Fail slot requests immediately at the SlotPool if unfulfillable
    
    Remove the SlotManagerException as extension of ResourceManagerException.
    
    [FLINK-13555][runtime][test] Add cases in SlotPoolBatchSlotRequestTest validates that pending batch slot requests fail on UnfulfillableSlotRequestException.
    
    [FLINK-13555][runtime] Add the information which slot request failed in stack trace of exception in SlotManager#registerSlotRequest.
    
    [FLINK-13555][runtime][test] Update SlotManagerFailUnfulfillableTest to validate that slot manager throws UnfulfillableSlotRequestException for failing unfulfillable requests.
    
    This closes #9339.
---
 .../runtime/jobmaster/slotpool/SlotPoolImpl.java   | 11 ++++-
 .../runtime/resourcemanager/ResourceManager.java   |  3 +-
 .../UnfulfillableSlotRequestException.java}        | 22 +++++----
 .../resourcemanager/slotmanager/SlotManager.java   |  5 +-
 .../slotmanager/SlotManagerImpl.java               | 15 +++---
 .../slotpool/SlotPoolBatchSlotRequestTest.java     | 54 ++++++++++++++++++++--
 .../runtime/jobmaster/slotpool/SlotPoolUtils.java  |  4 +-
 .../SlotManagerFailUnfulfillableTest.java          | 23 +++++----
 .../slotmanager/SlotManagerTest.java               |  4 +-
 9 files changed, 102 insertions(+), 39 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index 14beb8f..a381613 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -38,9 +38,11 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -348,7 +350,7 @@ public class SlotPoolImpl implements SlotPool {
 	private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) {
 		final PendingRequest request = pendingRequests.getKeyA(slotRequestID);
 		if (request != null) {
-			if (request.isBatchRequest) {
+			if (isBatchRequestAndFailureCanBeIgnored(request, failure)) {
 				log.debug("Ignoring failed request to the resource manager for a batch slot request.");
 			} else {
 				pendingRequests.removeKeyA(slotRequestID);
@@ -370,6 +372,11 @@ public class SlotPoolImpl implements SlotPool {
 		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
 	}
 
+	private boolean isBatchRequestAndFailureCanBeIgnored(PendingRequest request, Throwable failure){
+		return request.isBatchRequest &&
+			!ExceptionUtils.findThrowable(failure, UnfulfillableSlotRequestException.class).isPresent();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Slot releasing & offering
 	// ------------------------------------------------------------------------
@@ -679,7 +686,7 @@ public class SlotPoolImpl implements SlotPool {
 
 		final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
 		if (pendingRequest != null) {
-			if (pendingRequest.isBatchRequest) {
+			if (isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) {
 				// pending batch requests don't react to this signal --> put it back
 				pendingRequests.put(pendingRequest.getSlotRequestId(), allocationID, pendingRequest);
 			} else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index ec71e85..13544d3 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -58,7 +58,6 @@ import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistrat
 import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
@@ -442,7 +441,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 
 				try {
 					slotManager.registerSlotRequest(slotRequest);
-				} catch (SlotManagerException e) {
+				} catch (ResourceManagerException e) {
 					return FutureUtils.completedExceptionally(e);
 				}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/UnfulfillableSlotRequestException.java
similarity index 51%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/UnfulfillableSlotRequestException.java
index c322c81..fb6eb45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/UnfulfillableSlotRequestException.java
@@ -16,19 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.resourcemanager.slotmanager;
+package org.apache.flink.runtime.resourcemanager.exceptions;
 
-import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 
-public class SlotManagerException extends ResourceManagerException {
 
-	private static final long serialVersionUID = -3723028616920379071L;
-
-	public SlotManagerException(String message) {
-		super(message);
-	}
+/**
+ * Exception denoting that a slot request can not be fulfilled by any slot in the cluster.
+ * This usually indicates that the slot request should not be pended or retried.
+ */
+public class UnfulfillableSlotRequestException extends ResourceManagerException {
+	private static final long serialVersionUID = 4453490263648758730L;
 
-	public SlotManagerException(String message, Throwable cause) {
-		super(message, cause);
+	public UnfulfillableSlotRequestException(AllocationID allocationId, ResourceProfile resourceProfile) {
+		super("Could not fulfill slot request " + allocationId + ". "
+			+ "Requested resource profile (" + resourceProfile + ") is unfulfillable.");
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 0bdbc5b..b177e11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 
@@ -73,9 +74,9 @@ public interface SlotManager extends AutoCloseable {
 	 *
 	 * @param slotRequest specifying the requested slot specs
 	 * @return true if the slot request was registered; false if the request is a duplicate
-	 * @throws SlotManagerException if the slot request failed (e.g. not enough resources left)
+	 * @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)
 	 */
-	boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException;
+	boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException;
 
 	/**
 	 * Cancels and removes a pending slot request with the given allocation id. If there is no such
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index cd894e3..6fed1fe 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -290,10 +291,10 @@ public class SlotManagerImpl implements SlotManager {
 	 *
 	 * @param slotRequest specifying the requested slot specs
 	 * @return true if the slot request was registered; false if the request is a duplicate
-	 * @throws SlotManagerException if the slot request failed (e.g. not enough resources left)
+	 * @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)
 	 */
 	@Override
-	public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
+	public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
 		checkInit();
 
 		if (checkDuplicateRequest(slotRequest.getAllocationId())) {
@@ -311,7 +312,7 @@ public class SlotManagerImpl implements SlotManager {
 				// requesting the slot failed --> remove pending slot request
 				pendingSlotRequests.remove(slotRequest.getAllocationId());
 
-				throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
+				throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
 			}
 
 			return true;
@@ -494,8 +495,7 @@ public class SlotManagerImpl implements SlotManager {
 					resourceActions.notifyAllocationFailure(
 						pendingSlotRequest.getJobId(),
 						pendingSlotRequest.getAllocationId(),
-						new ResourceManagerException("Could not fulfill slot request " + pendingSlotRequest.getAllocationId() + ". "
-							+ "Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.")
+						new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile())
 					);
 				}
 			}
@@ -745,7 +745,7 @@ public class SlotManagerImpl implements SlotManager {
 	 * registered.
 	 *
 	 * @param pendingSlotRequest to allocate a slot for
-	 * @throws ResourceManagerException if the resource manager cannot allocate more resource
+	 * @throws ResourceManagerException if the slot request failed or is unfulfillable
 	 */
 	private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
 		final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
@@ -767,8 +767,7 @@ public class SlotManagerImpl implements SlotManager {
 				// request can not be fulfilled by any free slot or pending slot that can be allocated,
 				// check whether it can be fulfilled by allocated slots
 				if (failUnfulfillableRequest && !isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) {
-					throw new ResourceManagerException("Requested resource profile (" +
-						pendingSlotRequest.getResourceProfile() + ") is unfulfillable.");
+					throw new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile());
 				}
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
index 2231060..df4dfb3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.util.clock.ManualClock;
 import org.apache.flink.util.ExceptionUtils;
@@ -129,7 +130,7 @@ public class SlotPoolBatchSlotRequestTest extends TestLogger {
 
 	/**
 	 * Tests that a batch slot request does not react to {@link SlotPool#failAllocation(AllocationID, Exception)}
-	 * signals.
+	 * signals whose exception is not {@link UnfulfillableSlotRequestException}.
 	 */
 	@Test
 	public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exception {
@@ -147,14 +148,40 @@ public class SlotPoolBatchSlotRequestTest extends TestLogger {
 
 			final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);
 
-			SlotPoolUtils.failAllocation(slotPool, directMainThreadExecutor, allocationIdFuture.get());
+			SlotPoolUtils.failAllocation(slotPool, directMainThreadExecutor, allocationIdFuture.get(), new FlinkException("Failed request"));
 
 			assertThat(slotFuture.isDone(), is(false));
 		}
 	}
 
 	/**
-	 * Tests that a batch slot request won't fail if its resource manager request fails.
+	 * Tests that a batch slot request does react to {@link SlotPool#failAllocation(AllocationID, Exception)}
+	 * signals whose exception is {@link UnfulfillableSlotRequestException}.
+	 */
+	@Test
+	public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception {
+		final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+		final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+		testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+		final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+		try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
+			.setResourceManagerGateway(testingResourceManagerGateway)
+			.build()) {
+
+			final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);
+
+			SlotPoolUtils.failAllocation(slotPool, directMainThreadExecutor, allocationIdFuture.get(),
+				new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN));
+
+			assertThat(slotFuture.isCompletedExceptionally(), is(true));
+		}
+	}
+
+	/**
+	 * Tests that a batch slot request won't fail if its resource manager request fails with exceptions other than
+	 * {@link UnfulfillableSlotRequestException}.
 	 */
 	@Test
 	public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exception {
@@ -176,6 +203,27 @@ public class SlotPoolBatchSlotRequestTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that a batch slot request fails if its resource manager request fails with {@link UnfulfillableSlotRequestException}.
+	 */
+	@Test
+	public void testPendingBatchSlotRequestFailsIfRMRequestFailsUnfulfillably() throws Exception {
+		final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+		testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(
+			new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN)));
+
+		final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+		try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
+			.setResourceManagerGateway(testingResourceManagerGateway)
+			.build()) {
+
+			final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);
+
+			assertThat(slotFuture.isCompletedExceptionally(), is(true));
+		}
+	}
+
+	/**
 	 * Tests that a pending batch slot request times out after the last fulfilling slot gets
 	 * released.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
index c4dfe88..3836881 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
@@ -97,9 +97,9 @@ public class SlotPoolUtils {
 		return taskManagerLocation.getResourceID();
 	}
 
-	public static void failAllocation(SlotPoolImpl slotPool, ComponentMainThreadExecutor mainThreadExecutor, AllocationID allocationId) {
+	public static void failAllocation(SlotPoolImpl slotPool, ComponentMainThreadExecutor mainThreadExecutor, AllocationID allocationId, Exception exception) {
 		CompletableFuture.runAsync(
-			() -> slotPool.failAllocation(allocationId, new FlinkException("Test exception")),
+			() -> slotPool.failAllocation(allocationId, exception),
 			mainThreadExecutor).join();
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
index cc4a2c1..af31c3a 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -26,11 +27,14 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -40,6 +44,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -73,7 +78,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 		final ResourceProfile availableProfile = new ResourceProfile(2.0, 100);
 		final ResourceProfile unfulfillableProfile = new ResourceProfile(1.0, 200);
 
-		final List<AllocationID> allocationFailures = new ArrayList<>();
+		final List<Tuple3<JobID, AllocationID, Exception>> allocationFailures = new ArrayList<>();
 		final SlotManager slotManager = createSlotManagerNotStartingNewTMs(allocationFailures);
 		slotManager.setFailUnfulfillableRequest(false);
 		registerFreeSlot(slotManager, availableProfile);
@@ -85,7 +90,8 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 
 		// assert
 		assertEquals(1, allocationFailures.size());
-		assertEquals(request.getAllocationId(), allocationFailures.get(0));
+		assertEquals(request.getAllocationId(), allocationFailures.get(0).f1);
+		assertTrue(ExceptionUtils.findThrowable(allocationFailures.get(0).f2, UnfulfillableSlotRequestException.class).isPresent());
 		assertEquals(0, slotManager.getNumberPendingSlotRequests());
 	}
 
@@ -124,12 +130,12 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 	}
 
 	@Test
-	public void testUnfulfillableRequestsFailWhenOn() throws Exception {
+	public void testUnfulfillableRequestsFailWhenOn() {
 		// setup
 		final ResourceProfile availableProfile = new ResourceProfile(2.0, 100);
 		final ResourceProfile unfulfillableProfile = new ResourceProfile(2.0, 200);
 
-		final List<AllocationID> notifiedAllocationFailures = new ArrayList<>();
+		final List<Tuple3<JobID, AllocationID, Exception>> notifiedAllocationFailures = new ArrayList<>();
 		final SlotManager slotManager = createSlotManagerNotStartingNewTMs(notifiedAllocationFailures);
 		registerFreeSlot(slotManager, availableProfile);
 
@@ -137,8 +143,9 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 		try {
 			slotManager.registerSlotRequest(slotRequest(unfulfillableProfile));
 			fail("this should cause an exception");
+		} catch (ResourceManagerException exception) {
+			assertTrue(ExceptionUtils.findThrowable(exception, UnfulfillableSlotRequestException.class).isPresent());
 		}
-		catch (SlotManagerException ignored) {}
 
 		// assert
 		assertEquals(0, notifiedAllocationFailures.size());
@@ -169,7 +176,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 		return createSlotManager(new ArrayList<>(), false);
 	}
 
-	private static SlotManager createSlotManagerNotStartingNewTMs(List<AllocationID> notifiedAllocationFailures) {
+	private static SlotManager createSlotManagerNotStartingNewTMs(List<Tuple3<JobID, AllocationID, Exception>> notifiedAllocationFailures) {
 		return createSlotManager(notifiedAllocationFailures, false);
 	}
 
@@ -178,14 +185,14 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 	}
 
 	private static SlotManager createSlotManager(
-			List<AllocationID> notifiedAllocationFailures,
+			List<Tuple3<JobID, AllocationID, Exception>> notifiedAllocationFailures,
 			boolean startNewTMs) {
 
 		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
 			.setAllocateResourceFunction((resourceProfile) -> startNewTMs ?
 							Collections.singleton(resourceProfile) :
 							Collections.emptyList())
-			.setNotifyAllocationFailureConsumer(tuple3 -> notifiedAllocationFailures.add(tuple3.f1))
+			.setNotifyAllocationFailureConsumer(tuple3 -> notifiedAllocationFailures.add(tuple3))
 			.build();
 
 		SlotManager slotManager = SlotManagerBuilder.newBuilder().build();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index b155ed6..a99fceb 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -842,7 +842,7 @@ public class SlotManagerTest extends TestLogger {
 				(Object value) -> {
 					try {
 						slotManager.registerSlotRequest(slotRequest);
-					} catch (SlotManagerException e) {
+					} catch (ResourceManagerException e) {
 						throw new RuntimeException("Could not register slots.", e);
 					}
 				});
@@ -953,7 +953,7 @@ public class SlotManagerTest extends TestLogger {
 				() -> {
 					try {
 						return slotManager.registerSlotRequest(slotRequest);
-					} catch (SlotManagerException e) {
+					} catch (ResourceManagerException e) {
 						throw new CompletionException(e);
 					}
 				},