You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/30 18:26:48 UTC
[flink] 03/06: [FLINK-13408][runtime] Let StandaloneResourceManager
start startup period upon granting leadership
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0f9d0952cd89a9866d17b9d00b5ff69738974f87
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Jul 26 11:48:50 2019 +0200
[FLINK-13408][runtime] Let StandaloneResourceManager start startup period upon granting leadership
During the startup period, the StandaloneResourceManager won't fail slot requests if it cannot allocate new
containers.
[FLINK-13408][runtime][test] Add StandaloneResourceManagerTest#testStartUpPeriodAfterLeadershipSwitch validates that StandaloneResourceManager applies a startup period whenever the leadership is acquired.
[FLINK-13408][runtime] StandaloneResourceManager#startStartupPeriod uses RpcEndpoint#scheduleRunAsync.
This closes #9242.
---
.../clusterframework/MesosResourceManager.java | 1 -
.../runtime/resourcemanager/ResourceManager.java | 11 +-
.../resourcemanager/StandaloneResourceManager.java | 30 +++--
.../resourcemanager/slotmanager/SlotManager.java | 2 -
.../slotmanager/SlotManagerImpl.java | 5 -
.../StandaloneResourceManagerTest.java | 86 ++++++++++----
.../SlotManagerFailUnfulfillableTest.java | 6 +-
.../slotmanager/TestingSlotManager.java | 131 +++++++++++++++++++++
.../slotmanager/TestingSlotManagerFactory.java | 38 ++++++
.../utils/MockResourceManagerRuntimeServices.java | 20 +++-
.../org/apache/flink/yarn/YarnResourceManager.java | 1 -
11 files changed, 277 insertions(+), 54 deletions(-)
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 0bb8d41..4604507 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -197,7 +197,6 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
this.workersBeingReturned = new HashMap<>(8);
this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig);
- setFailUnfulfillableRequest(true);
}
protected ActorRef createSelfActor() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 1a77a98..ec71e85 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -926,9 +926,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
setFencingToken(newResourceManagerId);
- startHeartbeatServices();
-
- slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+ startServicesOnLeadership();
return prepareLeadershipAsync().thenApply(ignored -> true);
} else {
@@ -936,6 +934,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
}
}
+ protected void startServicesOnLeadership() {
+ startHeartbeatServices();
+
+ slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+ }
+
/**
* Callback method when current resourceManager loses leadership.
*/
@@ -952,6 +956,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
slotManager.suspend();
stopHeartbeatServices();
+
});
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index bc0e8df..cab53d3 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -80,16 +80,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
@Override
protected void initialize() throws ResourceManagerException {
- final long startupPeriodMillis = startupPeriodTime.toMilliseconds();
-
- if (startupPeriodMillis > 0) {
- getRpcService().getScheduledExecutor().schedule(
- () -> getMainThreadExecutor().execute(
- () -> setFailUnfulfillableRequest(true)),
- startupPeriodMillis,
- TimeUnit.MILLISECONDS
- );
- }
+ // nothing to initialize
}
@Override
@@ -112,4 +103,23 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
return resourceID;
}
+ @Override
+ protected void startServicesOnLeadership() {
+ super.startServicesOnLeadership();
+ startStartupPeriod();
+ }
+
+ private void startStartupPeriod() {
+ setFailUnfulfillableRequest(false);
+
+ final long startupPeriodMillis = startupPeriodTime.toMilliseconds();
+
+ if (startupPeriodMillis > 0) {
+ scheduleRunAsync(
+ () -> setFailUnfulfillableRequest(true),
+ startupPeriodMillis,
+ TimeUnit.MILLISECONDS
+ );
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 5b709d3..0bdbc5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -54,8 +54,6 @@ public interface SlotManager extends AutoCloseable {
int getNumberPendingSlotRequests();
- boolean isFailingUnfulfillableRequest();
-
/**
* Starts the slot manager with the given leader id and resource manager actions.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index 376d030..4e4124b 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -194,11 +194,6 @@ public class SlotManagerImpl implements SlotManager {
return pendingSlotRequests.size();
}
- @Override
- public boolean isFailingUnfulfillableRequest() {
- return failUnfulfillableRequest;
- }
-
@VisibleForTesting
public int getNumberAssignedPendingTaskManagerSlots() {
return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
index 37dbccc..89b6fa6 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerFactory;
import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -41,7 +42,7 @@ import org.junit.Test;
import java.time.Duration;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -61,33 +62,64 @@ public class StandaloneResourceManagerTest extends TestLogger {
@Test
public void testStartupPeriod() throws Exception {
- final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(1L));
- final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10L));
+ final LinkedBlockingQueue<Boolean> setFailUnfulfillableRequestInvokes = new LinkedBlockingQueue<>();
+ final SlotManager slotManager = new TestingSlotManagerFactory()
+ .setSetFailUnfulfillableRequestConsumer(invoke -> setFailUnfulfillableRequestInvokes.add(invoke))
+ .createSlotManager();
+ final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(1L), slotManager);
- assertHappensUntil(() -> rm.isFailingUnfulfillableRequestAsync().join(), deadline);
+ final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+ assertHappensUntil(() -> setFailUnfulfillableRequestInvokes.size() == 2, deadline);
+ assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false));
+ assertThat(setFailUnfulfillableRequestInvokes.poll(), is(true));
rm.close();
}
@Test
public void testNoStartupPeriod() throws Exception {
- final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(-1L));
+ final LinkedBlockingQueue<Boolean> setFailUnfulfillableRequestInvokes = new LinkedBlockingQueue<>();
+ final SlotManager slotManager = new TestingSlotManagerFactory()
+ .setSetFailUnfulfillableRequestConsumer(invoke -> setFailUnfulfillableRequestInvokes.add(invoke))
+ .createSlotManager();
+ final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(-1L), slotManager);
- // startup includes initialization and granting leadership, so by the time we are
- // here, the initialization method scheduling the startup period will have been executed.
-
- assertThat(fatalErrorHandler.hasExceptionOccurred(), is(false));
-
- assertThat(rm.isFailingUnfulfillableRequestAsync().join(), is(false));
+ final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+ assertNotHappens(() -> setFailUnfulfillableRequestInvokes.size() > 1, deadline);
+ assertThat(setFailUnfulfillableRequestInvokes.size(), is(1));
+ assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false));
rm.close();
}
- private TestingStandaloneResourceManager createResourceManager(Time startupPeriod) throws Exception {
+ @Test
+ public void testStartUpPeriodAfterLeadershipSwitch() throws Exception {
+ final LinkedBlockingQueue<Boolean> setFailUnfulfillableRequestInvokes = new LinkedBlockingQueue<>();
+ final SlotManager slotManager = new TestingSlotManagerFactory()
+ .setSetFailUnfulfillableRequestConsumer(invoke -> setFailUnfulfillableRequestInvokes.add(invoke))
+ .createSlotManager();
+ final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(1L), slotManager);
+
+ final Deadline deadline1 = Deadline.fromNow(Duration.ofSeconds(1));
+ assertHappensUntil(() -> setFailUnfulfillableRequestInvokes.size() == 2, deadline1);
+ assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false));
+ assertThat(setFailUnfulfillableRequestInvokes.poll(), is(true));
+
+ rm.rmServices.revokeLeadership();
+ rm.rmServices.grantLeadership();
+
+ final Deadline deadline2 = Deadline.fromNow(Duration.ofSeconds(1L));
+ assertHappensUntil(() -> setFailUnfulfillableRequestInvokes.size() == 2, deadline2);
+ assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false));
+ assertThat(setFailUnfulfillableRequestInvokes.poll(), is(true));
+ }
+
+ private TestingStandaloneResourceManager createResourceManager(Time startupPeriod, SlotManager slotManager) throws Exception {
final MockResourceManagerRuntimeServices rmServices = new MockResourceManagerRuntimeServices(
RPC_SERVICE.getTestingRpcService(),
- TIMEOUT);
+ TIMEOUT,
+ slotManager);
final TestingStandaloneResourceManager rm = new TestingStandaloneResourceManager(
rmServices.rpcService,
@@ -101,7 +133,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
new ClusterInformation("localhost", 1234),
fatalErrorHandler,
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
- startupPeriod);
+ startupPeriod,
+ rmServices);
rm.start();
rmServices.grantLeadership();
@@ -120,8 +153,20 @@ public class StandaloneResourceManagerTest extends TestLogger {
}
}
+ private static void assertNotHappens(
+ SupplierWithException<Boolean, InterruptedException> condition,
+ Deadline until) throws InterruptedException {
+ while (!condition.get()) {
+ if (!until.hasTimeLeft()) {
+ return;
+ }
+ Thread.sleep(2);
+ }
+ fail("condition was fulfilled before the deadline");
+ }
+
private static class TestingStandaloneResourceManager extends StandaloneResourceManager {
- private final SlotManager slotManagerForTest;
+ private final MockResourceManagerRuntimeServices rmServices;
private TestingStandaloneResourceManager(
RpcService rpcService,
@@ -135,7 +180,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
JobManagerMetricGroup jobManagerMetricGroup,
- Time startupPeriodTime) {
+ Time startupPeriodTime,
+ MockResourceManagerRuntimeServices rmServices) {
super(
rpcService,
resourceManagerEndpointId,
@@ -149,13 +195,7 @@ public class StandaloneResourceManagerTest extends TestLogger {
fatalErrorHandler,
jobManagerMetricGroup,
startupPeriodTime);
- slotManagerForTest = slotManager;
- }
-
- private CompletableFuture<Boolean> isFailingUnfulfillableRequestAsync() {
- return CompletableFuture.supplyAsync(
- slotManagerForTest::isFailingUnfulfillableRequest,
- getMainThreadExecutor());
+ this.rmServices = rmServices;
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
index cda07fe..cc4a2c1 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
@@ -54,6 +54,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
final ResourceProfile fulfillableProfile = new ResourceProfile(1.0, 100);
final SlotManager slotManager = createSlotManagerNotStartingNewTMs();
+ slotManager.setFailUnfulfillableRequest(false);
registerFreeSlot(slotManager, availableProfile);
slotManager.registerSlotRequest(slotRequest(fulfillableProfile));
@@ -74,6 +75,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
final List<AllocationID> allocationFailures = new ArrayList<>();
final SlotManager slotManager = createSlotManagerNotStartingNewTMs(allocationFailures);
+ slotManager.setFailUnfulfillableRequest(false);
registerFreeSlot(slotManager, availableProfile);
// test
@@ -94,6 +96,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
final ResourceProfile newTmProfile = new ResourceProfile(2.0, 200);
final SlotManager slotManager = createSlotManagerStartingNewTMs();
+ slotManager.setFailUnfulfillableRequest(false);
registerFreeSlot(slotManager, availableProfile);
// test
@@ -111,7 +114,6 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
final SlotManager slotManager = createSlotManagerNotStartingNewTMs();
registerFreeSlot(slotManager, availableProfile);
- slotManager.setFailUnfulfillableRequest(true);
// test
slotManager.registerSlotRequest(slotRequest(availableProfile));
@@ -130,7 +132,6 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
final List<AllocationID> notifiedAllocationFailures = new ArrayList<>();
final SlotManager slotManager = createSlotManagerNotStartingNewTMs(notifiedAllocationFailures);
registerFreeSlot(slotManager, availableProfile);
- slotManager.setFailUnfulfillableRequest(true);
// test
try {
@@ -152,7 +153,6 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
final SlotManager slotManager = createSlotManagerStartingNewTMs();
registerFreeSlot(slotManager, availableProfile);
- slotManager.setFailUnfulfillableRequest(true);
// test
slotManager.registerSlotRequest(slotRequest(newTmProfile));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
new file mode 100644
index 0000000..a07846e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of {@link SlotManager} for testing purpose.
+ */
+public class TestingSlotManager implements SlotManager {
+
+ private final Consumer<Boolean> setFailUnfulfillableRequestConsumer;
+
+ public TestingSlotManager(@Nullable Consumer<Boolean> setFailUnfulfillableRequestConsumer) {
+ this.setFailUnfulfillableRequestConsumer = setFailUnfulfillableRequestConsumer;
+ }
+
+ @Override
+ public int getNumberRegisteredSlots() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
+ return 0;
+ }
+
+ @Override
+ public int getNumberFreeSlots() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberFreeSlotsOf(InstanceID instanceId) {
+ return 0;
+ }
+
+ @Override
+ public int getNumberPendingTaskManagerSlots() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberPendingSlotRequests() {
+ return 0;
+ }
+
+ @Override
+ public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
+
+ }
+
+ @Override
+ public void suspend() {
+
+ }
+
+ @Override
+ public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
+ return false;
+ }
+
+ @Override
+ public boolean unregisterSlotRequest(AllocationID allocationId) {
+ return false;
+ }
+
+ @Override
+ public void registerTaskManager(TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
+
+ }
+
+ @Override
+ public boolean unregisterTaskManager(InstanceID instanceId) {
+ return false;
+ }
+
+ @Override
+ public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
+ return false;
+ }
+
+ @Override
+ public void freeSlot(SlotID slotId, AllocationID allocationId) {
+
+ }
+
+ @Override
+ public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
+ if (setFailUnfulfillableRequestConsumer != null) {
+ setFailUnfulfillableRequestConsumer.accept(failUnfulfillableRequest);
+ }
+ }
+
+ @Override
+ public void unregisterTaskManagersAndReleaseResources() {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManagerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManagerFactory.java
new file mode 100644
index 0000000..a3e6ab4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManagerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import java.util.function.Consumer;
+
+/**
+ * Factory for {@link TestingSlotManager}.
+ */
+public class TestingSlotManagerFactory {
+
+ private Consumer<Boolean> setFailUnfulfillableRequestConsumer;
+
+ public TestingSlotManagerFactory setSetFailUnfulfillableRequestConsumer(Consumer<Boolean> setFailUnfulfillableRequestConsumer) {
+ this.setFailUnfulfillableRequestConsumer = setFailUnfulfillableRequestConsumer;
+ return this;
+ }
+
+ public TestingSlotManager createSlotManager() {
+ return new TestingSlotManager(setFailUnfulfillableRequestConsumer);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
index 0b1a9bf..737decd 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
@@ -52,19 +52,23 @@ public class MockResourceManagerRuntimeServices {
public final SlotManager slotManager;
public MockResourceManagerRuntimeServices(RpcService rpcService, Time timeout) {
+ this(rpcService, timeout, SlotManagerBuilder.newBuilder()
+ .setScheduledExecutor(new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()))
+ .setTaskManagerRequestTimeout(Time.seconds(10))
+ .setSlotRequestTimeout(Time.seconds(10))
+ .setTaskManagerTimeout(Time.minutes(1))
+ .build());
+ }
+
+ public MockResourceManagerRuntimeServices(RpcService rpcService, Time timeout, SlotManager slotManager) {
this.rpcService = checkNotNull(rpcService);
this.timeout = checkNotNull(timeout);
+ this.slotManager = slotManager;
highAvailabilityServices = new TestingHighAvailabilityServices();
rmLeaderElectionService = new TestingLeaderElectionService();
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
heartbeatServices = new TestingHeartbeatServices();
metricRegistry = NoOpMetricRegistry.INSTANCE;
- slotManager = SlotManagerBuilder.newBuilder()
- .setScheduledExecutor(new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()))
- .setTaskManagerRequestTimeout(Time.seconds(10))
- .setSlotRequestTimeout(Time.seconds(10))
- .setTaskManagerTimeout(Time.minutes(1))
- .build();
jobLeaderIdService = new JobLeaderIdService(
highAvailabilityServices,
rpcService.getScheduledExecutor(),
@@ -75,4 +79,8 @@ public class MockResourceManagerRuntimeServices {
UUID rmLeaderSessionId = UUID.randomUUID();
rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
+
+ public void revokeLeadership() {
+ rmLeaderElectionService.notLeader();
+ }
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index e9a1f2d..c7cb064 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -185,7 +185,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig);
- setFailUnfulfillableRequest(true);
}
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(