You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/30 18:26:48 UTC

[flink] 03/06: [FLINK-13408][runtime] Let StandaloneResourceManager start startup period upon granting leadership

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f9d0952cd89a9866d17b9d00b5ff69738974f87
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Jul 26 11:48:50 2019 +0200

    [FLINK-13408][runtime] Let StandaloneResourceManager start startup period upon granting leadership
    
    During the startup period, the StandaloneResourceManager won't fail slot requests if it cannot allocate new
    containers.
    
    [FLINK-13408][runtime][test] Add StandaloneResourceManagerTest#testStartUpPeriodAfterLeadershipSwitch validates that StandaloneResourceManager applies a startup period whenever the leadership is acquired.
    
    [FLINK-13408][runtime] StandaloneResourceManager#startStartupPeriod uses RpcEndpoint#scheduleRunAsync.
    
    This closes #9242.
---
 .../clusterframework/MesosResourceManager.java     |   1 -
 .../runtime/resourcemanager/ResourceManager.java   |  11 +-
 .../resourcemanager/StandaloneResourceManager.java |  30 +++--
 .../resourcemanager/slotmanager/SlotManager.java   |   2 -
 .../slotmanager/SlotManagerImpl.java               |   5 -
 .../StandaloneResourceManagerTest.java             |  86 ++++++++++----
 .../SlotManagerFailUnfulfillableTest.java          |   6 +-
 .../slotmanager/TestingSlotManager.java            | 131 +++++++++++++++++++++
 .../slotmanager/TestingSlotManagerFactory.java     |  38 ++++++
 .../utils/MockResourceManagerRuntimeServices.java  |  20 +++-
 .../org/apache/flink/yarn/YarnResourceManager.java |   1 -
 11 files changed, 277 insertions(+), 54 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 0bb8d41..4604507 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -197,7 +197,6 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		this.workersBeingReturned = new HashMap<>(8);
 
 		this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig);
-		setFailUnfulfillableRequest(true);
 	}
 
 	protected ActorRef createSelfActor() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 1a77a98..ec71e85 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -926,9 +926,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 
 			setFencingToken(newResourceManagerId);
 
-			startHeartbeatServices();
-
-			slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+			startServicesOnLeadership();
 
 			return prepareLeadershipAsync().thenApply(ignored -> true);
 		} else {
@@ -936,6 +934,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		}
 	}
 
+	protected void startServicesOnLeadership() {
+		startHeartbeatServices();
+
+		slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+	}
+
 	/**
 	 * Callback method when current resourceManager loses leadership.
 	 */
@@ -952,6 +956,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 				slotManager.suspend();
 
 				stopHeartbeatServices();
+
 			});
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index bc0e8df..cab53d3 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -80,16 +80,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	@Override
 	protected void initialize() throws ResourceManagerException {
-		final long startupPeriodMillis = startupPeriodTime.toMilliseconds();
-
-		if (startupPeriodMillis > 0) {
-			getRpcService().getScheduledExecutor().schedule(
-				() -> getMainThreadExecutor().execute(
-					() -> setFailUnfulfillableRequest(true)),
-				startupPeriodMillis,
-				TimeUnit.MILLISECONDS
-			);
-		}
+		// nothing to initialize
 	}
 
 	@Override
@@ -112,4 +103,23 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 		return resourceID;
 	}
 
+	@Override
+	protected void startServicesOnLeadership() {
+		super.startServicesOnLeadership();
+		startStartupPeriod();
+	}
+
+	private void startStartupPeriod() {
+		setFailUnfulfillableRequest(false);
+
+		final long startupPeriodMillis = startupPeriodTime.toMilliseconds();
+
+		if (startupPeriodMillis > 0) {
+			scheduleRunAsync(
+				() -> setFailUnfulfillableRequest(true),
+				startupPeriodMillis,
+				TimeUnit.MILLISECONDS
+			);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 5b709d3..0bdbc5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -54,8 +54,6 @@ public interface SlotManager extends AutoCloseable {
 
 	int getNumberPendingSlotRequests();
 
-	boolean isFailingUnfulfillableRequest();
-
 	/**
 	 * Starts the slot manager with the given leader id and resource manager actions.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index 376d030..4e4124b 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -194,11 +194,6 @@ public class SlotManagerImpl implements SlotManager {
 		return pendingSlotRequests.size();
 	}
 
-	@Override
-	public boolean isFailingUnfulfillableRequest() {
-		return failUnfulfillableRequest;
-	}
-
 	@VisibleForTesting
 	public int getNumberAssignedPendingTaskManagerSlots() {
 		return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
index 37dbccc..89b6fa6 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerFactory;
 import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -41,7 +42,7 @@ import org.junit.Test;
 
 import java.time.Duration;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
@@ -61,33 +62,64 @@ public class StandaloneResourceManagerTest extends TestLogger {
 
 	@Test
 	public void testStartupPeriod() throws Exception {
-		final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(1L));
-		final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10L));
+		final LinkedBlockingQueue<Boolean> setFailUnfulfillableRequestInvokes = new LinkedBlockingQueue<>();
+		final SlotManager slotManager = new TestingSlotManagerFactory()
+			.setSetFailUnfulfillableRequestConsumer(invoke -> setFailUnfulfillableRequestInvokes.add(invoke))
+			.createSlotManager();
+		final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(1L), slotManager);
 
-		assertHappensUntil(() -> rm.isFailingUnfulfillableRequestAsync().join(), deadline);
+		final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+		assertHappensUntil(() -> setFailUnfulfillableRequestInvokes.size() == 2, deadline);
+		assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false));
+		assertThat(setFailUnfulfillableRequestInvokes.poll(), is(true));
 
 		rm.close();
 	}
 
 	@Test
 	public void testNoStartupPeriod() throws Exception {
-		final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(-1L));
+		final LinkedBlockingQueue<Boolean> setFailUnfulfillableRequestInvokes = new LinkedBlockingQueue<>();
+		final SlotManager slotManager = new TestingSlotManagerFactory()
+			.setSetFailUnfulfillableRequestConsumer(invoke -> setFailUnfulfillableRequestInvokes.add(invoke))
+			.createSlotManager();
+		final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(-1L), slotManager);
 
-		// startup includes initialization and granting leadership, so by the time we are
-		// here, the initialization method scheduling the startup period will have been executed.
-
-		assertThat(fatalErrorHandler.hasExceptionOccurred(), is(false));
-
-		assertThat(rm.isFailingUnfulfillableRequestAsync().join(), is(false));
+		final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+		assertNotHappens(() -> setFailUnfulfillableRequestInvokes.size() > 1, deadline);
+		assertThat(setFailUnfulfillableRequestInvokes.size(), is(1));
+		assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false));
 
 		rm.close();
 	}
 
-	private TestingStandaloneResourceManager createResourceManager(Time startupPeriod) throws Exception {
+	@Test
+	public void testStartUpPeriodAfterLeadershipSwitch() throws Exception {
+		final LinkedBlockingQueue<Boolean> setFailUnfulfillableRequestInvokes = new LinkedBlockingQueue<>();
+		final SlotManager slotManager = new TestingSlotManagerFactory()
+			.setSetFailUnfulfillableRequestConsumer(invoke -> setFailUnfulfillableRequestInvokes.add(invoke))
+			.createSlotManager();
+		final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(1L), slotManager);
+
+		final Deadline deadline1 = Deadline.fromNow(Duration.ofSeconds(1));
+		assertHappensUntil(() -> setFailUnfulfillableRequestInvokes.size() == 2, deadline1);
+		assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false));
+		assertThat(setFailUnfulfillableRequestInvokes.poll(), is(true));
+
+		rm.rmServices.revokeLeadership();
+		rm.rmServices.grantLeadership();
+
+		final Deadline deadline2 = Deadline.fromNow(Duration.ofSeconds(1L));
+		assertHappensUntil(() -> setFailUnfulfillableRequestInvokes.size() == 2, deadline2);
+		assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false));
+		assertThat(setFailUnfulfillableRequestInvokes.poll(), is(true));
+	}
+
+	private TestingStandaloneResourceManager createResourceManager(Time startupPeriod, SlotManager slotManager) throws Exception {
 
 		final MockResourceManagerRuntimeServices rmServices = new MockResourceManagerRuntimeServices(
 			RPC_SERVICE.getTestingRpcService(),
-			TIMEOUT);
+			TIMEOUT,
+			slotManager);
 
 		final TestingStandaloneResourceManager rm = new TestingStandaloneResourceManager(
 			rmServices.rpcService,
@@ -101,7 +133,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
 			new ClusterInformation("localhost", 1234),
 			fatalErrorHandler,
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
-			startupPeriod);
+			startupPeriod,
+			rmServices);
 
 		rm.start();
 		rmServices.grantLeadership();
@@ -120,8 +153,20 @@ public class StandaloneResourceManagerTest extends TestLogger {
 		}
 	}
 
+	private static void assertNotHappens(
+		SupplierWithException<Boolean, InterruptedException> condition,
+		Deadline until) throws InterruptedException {
+		while (!condition.get()) {
+			if (!until.hasTimeLeft()) {
+				return;
+			}
+			Thread.sleep(2);
+		}
+		fail("condition was fulfilled before the deadline");
+	}
+
 	private static class TestingStandaloneResourceManager extends StandaloneResourceManager {
-		private final SlotManager slotManagerForTest;
+		private final MockResourceManagerRuntimeServices rmServices;
 
 		private TestingStandaloneResourceManager(
 				RpcService rpcService,
@@ -135,7 +180,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
 				ClusterInformation clusterInformation,
 				FatalErrorHandler fatalErrorHandler,
 				JobManagerMetricGroup jobManagerMetricGroup,
-				Time startupPeriodTime) {
+				Time startupPeriodTime,
+				MockResourceManagerRuntimeServices rmServices) {
 			super(
 				rpcService,
 				resourceManagerEndpointId,
@@ -149,13 +195,7 @@ public class StandaloneResourceManagerTest extends TestLogger {
 				fatalErrorHandler,
 				jobManagerMetricGroup,
 				startupPeriodTime);
-			slotManagerForTest = slotManager;
-		}
-
-		private CompletableFuture<Boolean> isFailingUnfulfillableRequestAsync() {
-			return CompletableFuture.supplyAsync(
-				slotManagerForTest::isFailingUnfulfillableRequest,
-				getMainThreadExecutor());
+			this.rmServices = rmServices;
 		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
index cda07fe..cc4a2c1 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
@@ -54,6 +54,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 		final ResourceProfile fulfillableProfile = new ResourceProfile(1.0, 100);
 
 		final SlotManager slotManager = createSlotManagerNotStartingNewTMs();
+		slotManager.setFailUnfulfillableRequest(false);
 		registerFreeSlot(slotManager, availableProfile);
 
 		slotManager.registerSlotRequest(slotRequest(fulfillableProfile));
@@ -74,6 +75,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 
 		final List<AllocationID> allocationFailures = new ArrayList<>();
 		final SlotManager slotManager = createSlotManagerNotStartingNewTMs(allocationFailures);
+		slotManager.setFailUnfulfillableRequest(false);
 		registerFreeSlot(slotManager, availableProfile);
 
 		// test
@@ -94,6 +96,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 		final ResourceProfile newTmProfile = new ResourceProfile(2.0, 200);
 
 		final SlotManager slotManager = createSlotManagerStartingNewTMs();
+		slotManager.setFailUnfulfillableRequest(false);
 		registerFreeSlot(slotManager, availableProfile);
 
 		// test
@@ -111,7 +114,6 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 
 		final SlotManager slotManager = createSlotManagerNotStartingNewTMs();
 		registerFreeSlot(slotManager, availableProfile);
-		slotManager.setFailUnfulfillableRequest(true);
 
 		// test
 		slotManager.registerSlotRequest(slotRequest(availableProfile));
@@ -130,7 +132,6 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 		final List<AllocationID> notifiedAllocationFailures = new ArrayList<>();
 		final SlotManager slotManager = createSlotManagerNotStartingNewTMs(notifiedAllocationFailures);
 		registerFreeSlot(slotManager, availableProfile);
-		slotManager.setFailUnfulfillableRequest(true);
 
 		// test
 		try {
@@ -152,7 +153,6 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 
 		final SlotManager slotManager = createSlotManagerStartingNewTMs();
 		registerFreeSlot(slotManager, availableProfile);
-		slotManager.setFailUnfulfillableRequest(true);
 
 		// test
 		slotManager.registerSlotRequest(slotRequest(newTmProfile));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
new file mode 100644
index 0000000..a07846e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of {@link SlotManager} for testing purpose.
+ */
+public class TestingSlotManager implements SlotManager {
+
+	private final Consumer<Boolean> setFailUnfulfillableRequestConsumer;
+
+	public TestingSlotManager(@Nullable Consumer<Boolean> setFailUnfulfillableRequestConsumer) {
+		this.setFailUnfulfillableRequestConsumer = setFailUnfulfillableRequestConsumer;
+	}
+
+	@Override
+	public int getNumberRegisteredSlots() {
+		return 0;
+	}
+
+	@Override
+	public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
+		return 0;
+	}
+
+	@Override
+	public int getNumberFreeSlots() {
+		return 0;
+	}
+
+	@Override
+	public int getNumberFreeSlotsOf(InstanceID instanceId) {
+		return 0;
+	}
+
+	@Override
+	public int getNumberPendingTaskManagerSlots() {
+		return 0;
+	}
+
+	@Override
+	public int getNumberPendingSlotRequests() {
+		return 0;
+	}
+
+	@Override
+	public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
+
+	}
+
+	@Override
+	public void suspend() {
+
+	}
+
+	@Override
+	public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
+		return false;
+	}
+
+	@Override
+	public boolean unregisterSlotRequest(AllocationID allocationId) {
+		return false;
+	}
+
+	@Override
+	public void registerTaskManager(TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
+
+	}
+
+	@Override
+	public boolean unregisterTaskManager(InstanceID instanceId) {
+		return false;
+	}
+
+	@Override
+	public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
+		return false;
+	}
+
+	@Override
+	public void freeSlot(SlotID slotId, AllocationID allocationId) {
+
+	}
+
+	@Override
+	public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
+		if (setFailUnfulfillableRequestConsumer != null) {
+			setFailUnfulfillableRequestConsumer.accept(failUnfulfillableRequest);
+		}
+	}
+
+	@Override
+	public void unregisterTaskManagersAndReleaseResources() {
+
+	}
+
+	@Override
+	public void close() throws Exception {
+
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManagerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManagerFactory.java
new file mode 100644
index 0000000..a3e6ab4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManagerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import java.util.function.Consumer;
+
+/**
+ * Factory for {@link TestingSlotManager}.
+ */
+public class TestingSlotManagerFactory {
+
+	private Consumer<Boolean> setFailUnfulfillableRequestConsumer;
+
+	public TestingSlotManagerFactory setSetFailUnfulfillableRequestConsumer(Consumer<Boolean> setFailUnfulfillableRequestConsumer) {
+		this.setFailUnfulfillableRequestConsumer = setFailUnfulfillableRequestConsumer;
+		return this;
+	}
+
+	public TestingSlotManager createSlotManager() {
+		return new TestingSlotManager(setFailUnfulfillableRequestConsumer);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
index 0b1a9bf..737decd 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
@@ -52,19 +52,23 @@ public class MockResourceManagerRuntimeServices {
 	public final SlotManager slotManager;
 
 	public MockResourceManagerRuntimeServices(RpcService rpcService, Time timeout) {
+		this(rpcService, timeout, SlotManagerBuilder.newBuilder()
+			.setScheduledExecutor(new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()))
+			.setTaskManagerRequestTimeout(Time.seconds(10))
+			.setSlotRequestTimeout(Time.seconds(10))
+			.setTaskManagerTimeout(Time.minutes(1))
+			.build());
+	}
+
+	public MockResourceManagerRuntimeServices(RpcService rpcService, Time timeout, SlotManager slotManager) {
 		this.rpcService = checkNotNull(rpcService);
 		this.timeout = checkNotNull(timeout);
+		this.slotManager = slotManager;
 		highAvailabilityServices = new TestingHighAvailabilityServices();
 		rmLeaderElectionService = new TestingLeaderElectionService();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 		heartbeatServices = new TestingHeartbeatServices();
 		metricRegistry = NoOpMetricRegistry.INSTANCE;
-		slotManager = SlotManagerBuilder.newBuilder()
-			.setScheduledExecutor(new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()))
-			.setTaskManagerRequestTimeout(Time.seconds(10))
-			.setSlotRequestTimeout(Time.seconds(10))
-			.setTaskManagerTimeout(Time.minutes(1))
-			.build();
 		jobLeaderIdService = new JobLeaderIdService(
 			highAvailabilityServices,
 			rpcService.getScheduledExecutor(),
@@ -75,4 +79,8 @@ public class MockResourceManagerRuntimeServices {
 		UUID rmLeaderSessionId = UUID.randomUUID();
 		rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 	}
+
+	public void revokeLeadership() {
+		rmLeaderElectionService.notLeader();
+	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index e9a1f2d..c7cb064 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -185,7 +185,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 
 		this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig);
-		setFailUnfulfillableRequest(true);
 	}
 
 	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(