You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:57 UTC
[08/10] flink git commit: [FLINK-5140] [JobManager] SlotPool accepts
allocation requests while ResourceManager is not connected
[FLINK-5140] [JobManager] SlotPool accepts allocation requests while ResourceManager is not connected
The requests are kept for a certain time and fulfilled once the ResourceManager is connected.
If no ResourceManager is connected in time, the allocation requests are failed.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b3283ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b3283ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b3283ec
Branch: refs/heads/flip-6
Commit: 6b3283ecd980e3db5d5b6cca86885d0dfad6e2cd
Parents: 82c1fcf
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 16:17:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/instance/SlotPool.java | 76 ++++++++++++--
.../flink/runtime/instance/SlotPoolRpcTest.java | 101 +++++++++++++++++++
.../flink/runtime/instance/SlotPoolTest.java | 27 -----
3 files changed, 166 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 65a5c45..1a2adfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -93,8 +93,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
// ------------------------------------------------------------------------
- private final Object lock = new Object();
-
private final JobID jobId;
private final ProviderAndOwner providerAndOwner;
@@ -111,6 +109,9 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
/** All pending requests waiting for slots */
private final HashMap<AllocationID, PendingRequest> pendingRequests;
+ /** The requests that are waiting for the resource manager to be connected */
+ private final HashMap<AllocationID, PendingRequest> waitingForResourceManager;
+
/** Timeout for request calls to the ResourceManager */
private final Time resourceManagerRequestsTimeout;
@@ -154,6 +155,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
this.allocatedSlots = new AllocatedSlots();
this.availableSlots = new AvailableSlots();
this.pendingRequests = new HashMap<>();
+ this.waitingForResourceManager = new HashMap<>();
this.providerAndOwner = new ProviderAndOwner(getSelf(), slotRequestTimeout);
}
@@ -233,6 +235,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
+
+ // work on all slots waiting for this connection
+ for (PendingRequest pending : waitingForResourceManager.values()) {
+ requestSlotFromResourceManager(pending.allocationID(), pending.future(), pending.resourceProfile());
+ }
+
+ // all sent off
+ waitingForResourceManager.clear();
}
@RpcMethod
@@ -273,16 +283,27 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
return FlinkCompletableFuture.completed(slot);
}
- // (2) no slot available, and no resource manager connection
+ // the request will be completed by a future
+ final AllocationID allocationID = new AllocationID();
+ final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+
+ // (2) need to request a slot
+
if (resourceManagerGateway == null) {
- return FlinkCompletableFuture.completedExceptionally(
- new NoResourceAvailableException("not connected to ResourceManager and no slot available"));
-
+ // no slot available, and no resource manager connection
+ stashRequestWaitingForResourceManager(allocationID, resources, future);
+ } else {
+ // we have a resource manager connection, so let's ask it for more resources
+ requestSlotFromResourceManager(allocationID, future, resources);
}
- // (3) we have a resource manager connection, so let's ask it for more resources
- final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
- final AllocationID allocationID = new AllocationID();
+ return future;
+ }
+
+ private void requestSlotFromResourceManager(
+ final AllocationID allocationID,
+ final FlinkCompletableFuture<SimpleSlot> future,
+ final ResourceProfile resources) {
LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID);
@@ -327,8 +348,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
return null;
}
}, getMainThreadExecutor());
-
- return future;
}
private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) {
@@ -357,6 +376,32 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
}
}
+ private void stashRequestWaitingForResourceManager(
+ final AllocationID allocationID,
+ final ResourceProfile resources,
+ final FlinkCompletableFuture<SimpleSlot> future) {
+
+ LOG.info("Cannot serve slot request, no ResourceManager connected. " +
+ "Adding as pending request {}", allocationID);
+
+ waitingForResourceManager.put(allocationID, new PendingRequest(allocationID, future, resources));
+
+ scheduleRunAsync(new Runnable() {
+ @Override
+ public void run() {
+ checkTimeoutRequestWaitingForResourceManager(allocationID);
+ }
+ }, resourceManagerRequestsTimeout);
+ }
+
+ private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) {
+ PendingRequest request = waitingForResourceManager.remove(allocationID);
+ if (request != null && !request.future().isDone()) {
+ request.future().completeExceptionally(new NoResourceAvailableException(
+ "No slot available and no connection to Resource Manager established."));
+ }
+ }
+
// ------------------------------------------------------------------------
// Slot releasing & offering
// ------------------------------------------------------------------------
@@ -401,6 +446,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
final ResourceProfile slotResources = slot.getResourceProfile();
+ // try the requests sent to the resource manager first
for (PendingRequest request : pendingRequests.values()) {
if (slotResources.isMatching(request.resourceProfile())) {
pendingRequests.remove(request.allocationID());
@@ -408,6 +454,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
}
}
+ // try the requests waiting for a resource manager connection next
+ for (PendingRequest request : waitingForResourceManager.values()) {
+ if (slotResources.isMatching(request.resourceProfile())) {
+ waitingForResourceManager.remove(request.allocationID());
+ return request;
+ }
+ }
+
// no request pending, or no request matches
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
new file mode 100644
index 0000000..89fd22f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.util.clock.SystemClock;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SlotPool using a proper RPC setup.
+ */
+public class SlotPoolRpcTest {
+
+ private static RpcService rpcService;
+
+ // ------------------------------------------------------------------------
+ // setup
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void setup() {
+ ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+ rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ rpcService.stopService();
+ }
+
+ // ------------------------------------------------------------------------
+ // tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testSlotAllocationNoResourceManager() throws Exception {
+ final JobID jid = new JobID();
+
+ final SlotPool pool = new SlotPool(
+ rpcService, jid,
+ SystemClock.getInstance(),
+ Time.days(1), Time.days(1),
+ Time.milliseconds(100) // this is the timeout for the request tested here
+ );
+ pool.start(UUID.randomUUID());
+
+ Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+
+ try {
+ future.get(4, TimeUnit.SECONDS);
+ fail("We expected a ExecutionException.");
+ }
+ catch (ExecutionException e) {
+ assertEquals(NoResourceAvailableException.class, e.getCause().getClass());
+ }
+ catch (TimeoutException e) {
+ fail("future timed out rather than being failed");
+ }
+ catch (Exception e) {
+ fail("wrong exception: " + e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 5fa7af3..97457e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
@@ -42,17 +41,13 @@ import org.mockito.ArgumentCaptor;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -123,28 +118,6 @@ public class SlotPoolTest extends TestLogger {
}
@Test
- public void testAllocateSlotWithoutResourceManager() throws Exception {
- slotPool.disconnectResourceManager();
- Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
- future.handleAsync(
- new BiFunction<SimpleSlot, Throwable, Void>() {
- @Override
- public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
- assertNull(simpleSlot);
- assertNotNull(throwable);
- return null;
- }
- },
- rpcService.getExecutor());
- try {
- future.get(1, TimeUnit.SECONDS);
- fail("We expected a ExecutionException.");
- } catch (ExecutionException ex) {
- // we expect the exception
- }
- }
-
- @Test
public void testAllocationFulfilledByReturnedSlot() throws Exception {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);