You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:57 UTC

[08/10] flink git commit: [FLINK-5140] [JobManager] SlotPool accepts allocation requests while ResourceManager is not connected

[FLINK-5140] [JobManager] SlotPool accepts allocation requests while ResourceManager is not connected

The requests are kept for a certain time and fulfilled once the ResourceManager is connected.
If no ResourceManager is connected in time, the allocation requests are failed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b3283ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b3283ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b3283ec

Branch: refs/heads/flip-6
Commit: 6b3283ecd980e3db5d5b6cca86885d0dfad6e2cd
Parents: 82c1fcf
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 16:17:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java |  76 ++++++++++++--
 .../flink/runtime/instance/SlotPoolRpcTest.java | 101 +++++++++++++++++++
 .../flink/runtime/instance/SlotPoolTest.java    |  27 -----
 3 files changed, 166 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 65a5c45..1a2adfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -93,8 +93,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 	// ------------------------------------------------------------------------
 
-	private final Object lock = new Object();
-
 	private final JobID jobId;
 
 	private final ProviderAndOwner providerAndOwner;
@@ -111,6 +109,9 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	/** All pending requests waiting for slots */
 	private final HashMap<AllocationID, PendingRequest> pendingRequests;
 
+	/** The requests that are waiting for the resource manager to be connected */
+	private final HashMap<AllocationID, PendingRequest> waitingForResourceManager;
+
 	/** Timeout for request calls to the ResourceManager */
 	private final Time resourceManagerRequestsTimeout;
 
@@ -154,6 +155,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		this.allocatedSlots = new AllocatedSlots();
 		this.availableSlots = new AvailableSlots();
 		this.pendingRequests = new HashMap<>();
+		this.waitingForResourceManager = new HashMap<>();
 
 		this.providerAndOwner = new ProviderAndOwner(getSelf(), slotRequestTimeout);
 	}
@@ -233,6 +235,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) {
 		this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
 		this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
+
+		// work on all slots waiting for this connection
+		for (PendingRequest pending : waitingForResourceManager.values()) {
+			requestSlotFromResourceManager(pending.allocationID(), pending.future(), pending.resourceProfile());
+		}
+
+		// all sent off
+		waitingForResourceManager.clear();
 	}
 
 	@RpcMethod
@@ -273,16 +283,27 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 			return FlinkCompletableFuture.completed(slot);
 		}
 
-		// (2) no slot available, and no resource manager connection
+		// the request will be completed by a future
+		final AllocationID allocationID = new AllocationID();
+		final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+
+		// (2) need to request a slot
+
 		if (resourceManagerGateway == null) {
-			return FlinkCompletableFuture.completedExceptionally(
-					new NoResourceAvailableException("not connected to ResourceManager and no slot available"));
-			
+			// no slot available, and no resource manager connection
+			stashRequestWaitingForResourceManager(allocationID, resources, future);
+		} else {
+			// we have a resource manager connection, so let's ask it for more resources
+			requestSlotFromResourceManager(allocationID, future, resources);
 		}
 
-		// (3) we have a resource manager connection, so let's ask it for more resources
-		final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
-		final AllocationID allocationID = new AllocationID();
+		return future;
+	}
+
+	private void requestSlotFromResourceManager(
+			final AllocationID allocationID,
+			final FlinkCompletableFuture<SimpleSlot> future,
+			final ResourceProfile resources) {
 
 		LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID);
 
@@ -327,8 +348,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 				return null;
 			}
 		}, getMainThreadExecutor());
-
-		return future;
 	}
 
 	private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) {
@@ -357,6 +376,32 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		}
 	}
 
+	private void stashRequestWaitingForResourceManager(
+			final AllocationID allocationID,
+			final ResourceProfile resources,
+			final FlinkCompletableFuture<SimpleSlot> future) {
+
+		LOG.info("Cannot serve slot request, no ResourceManager connected. " +
+				"Adding as pending request {}",  allocationID);
+
+		waitingForResourceManager.put(allocationID, new PendingRequest(allocationID, future, resources));
+
+		scheduleRunAsync(new Runnable() {
+			@Override
+			public void run() {
+				checkTimeoutRequestWaitingForResourceManager(allocationID);
+			}
+		}, resourceManagerRequestsTimeout);
+	}
+
+	private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) {
+		PendingRequest request = waitingForResourceManager.remove(allocationID);
+		if (request != null && !request.future().isDone()) {
+			request.future().completeExceptionally(new NoResourceAvailableException(
+					"No slot available and no connection to Resource Manager established."));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Slot releasing & offering
 	// ------------------------------------------------------------------------
@@ -401,6 +446,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
 		final ResourceProfile slotResources = slot.getResourceProfile();
 
+		// try the requests sent to the resource manager first
 		for (PendingRequest request : pendingRequests.values()) {
 			if (slotResources.isMatching(request.resourceProfile())) {
 				pendingRequests.remove(request.allocationID());
@@ -408,6 +454,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 			}
 		}
 
+		// try the requests waiting for a resource manager connection next
+		for (PendingRequest request : waitingForResourceManager.values()) {
+			if (slotResources.isMatching(request.resourceProfile())) {
+				waitingForResourceManager.remove(request.allocationID());
+				return request;
+			}
+		}
+
 		// no request pending, or no request matches
 		return null;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
new file mode 100644
index 0000000..89fd22f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.util.clock.SystemClock;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SlotPool using a proper RPC setup.
+ */
+public class SlotPoolRpcTest {
+
+	private static RpcService rpcService;
+
+	// ------------------------------------------------------------------------
+	//  setup
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void setup() {
+		ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+		rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
+	}
+
+	@AfterClass
+	public static  void shutdown() {
+		rpcService.stopService();
+	}
+
+	// ------------------------------------------------------------------------
+	//  tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testSlotAllocationNoResourceManager() throws Exception {
+		final JobID jid = new JobID();
+		
+		final SlotPool pool = new SlotPool(
+				rpcService, jid,
+				SystemClock.getInstance(),
+				Time.days(1), Time.days(1),
+				Time.milliseconds(100) // this is the timeout for the request tested here
+		);
+		pool.start(UUID.randomUUID());
+
+		Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+
+		try {
+			future.get(4, TimeUnit.SECONDS);
+			fail("We expected a ExecutionException.");
+		}
+		catch (ExecutionException e) {
+			assertEquals(NoResourceAvailableException.class, e.getCause().getClass());
+		}
+		catch (TimeoutException e) {
+			fail("future timed out rather than being failed");
+		}
+		catch (Exception e) {
+			fail("wrong exception: " + e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 5fa7af3..97457e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
@@ -42,17 +41,13 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -123,28 +118,6 @@ public class SlotPoolTest extends TestLogger {
 	}
 
 	@Test
-	public void testAllocateSlotWithoutResourceManager() throws Exception {
-		slotPool.disconnectResourceManager();
-		Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
-		future.handleAsync(
-			new BiFunction<SimpleSlot, Throwable, Void>() {
-				@Override
-				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
-					assertNull(simpleSlot);
-					assertNotNull(throwable);
-					return null;
-				}
-			},
-			rpcService.getExecutor());
-		try {
-			future.get(1, TimeUnit.SECONDS);
-			fail("We expected a ExecutionException.");
-		} catch (ExecutionException ex) {
-			// we expect the exception
-		}
-	}
-
-	@Test
 	public void testAllocationFulfilledByReturnedSlot() throws Exception {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);