You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/11 15:14:30 UTC

[flink] branch release-1.6 updated: [FLINK-10848] Remove container requests after successful container allocation

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 4022836  [FLINK-10848] Remove container requests after successful container allocation
4022836 is described below

commit 4022836148a9e7cf39859389e644875dcd2a05e4
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jan 8 13:06:50 2019 +0100

    [FLINK-10848] Remove container requests after successful container allocation
    
    This commit removes container requests after containers have been allocated. This prevents that
    we will request more and more containers from Yarn in case of a recovery.
    
    Since we cannot rely on the reported container Resource, we remove the container request by
    using the requested Resource. This is due Yarn's DefaultResourceCalculator which neglects the
    number of vCores when allocating containers.
---
 .../flink/yarn/YarnFlinkResourceManager.java       |  73 +++++++++++----
 .../org/apache/flink/yarn/YarnResourceManager.java | 101 ++++++++++++++-------
 flink-yarn/src/main/resources/log4j.properties     |   2 +-
 .../flink/yarn/YarnFlinkResourceManagerTest.java   |  30 +++---
 .../apache/flink/yarn/YarnResourceManagerTest.java |  12 ++-
 .../src/test/resources/log4j-test.properties       |   2 +-
 6 files changed, 150 insertions(+), 70 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 8e686bb..e8e55c3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -41,15 +41,20 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -72,6 +77,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 	 * Container ID generation may vary across Hadoop versions. */
 	static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
 
+	private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance(0);
+
 	/** The containers where a TaskManager is starting and we are waiting for it to register. */
 	private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch;
 
@@ -314,6 +321,21 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 
 	@Override
 	protected void requestNewWorkers(int numWorkers) {
+		final Resource capability = getContainerResource();
+
+		for (int i = 0; i < numWorkers; i++) {
+			numPendingContainerRequests++;
+			LOG.info("Requesting new TaskManager container with {} megabytes memory. Pending requests: {}",
+				capability.getMemory(), numPendingContainerRequests);
+
+			resourceManagerClient.addContainerRequest(createContainerRequest(capability));
+		}
+
+		// make sure we transmit the request fast and receive fast news of granted allocations
+		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+	}
+
+	private Resource getContainerResource() {
 		final long mem = taskManagerParameters.taskManagerTotalMemoryMB();
 		final int containerMemorySizeMB;
 
@@ -325,25 +347,15 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 				mem, containerMemorySizeMB);
 		}
 
-		for (int i = 0; i < numWorkers; i++) {
-			numPendingContainerRequests++;
-			LOG.info("Requesting new TaskManager container with {} megabytes memory. Pending requests: {}",
-				containerMemorySizeMB, numPendingContainerRequests);
-
-			// Priority for worker containers - priorities are intra-application
-			Priority priority = Priority.newInstance(0);
-
-			// Resource requirements for worker containers
-			int taskManagerSlots = taskManagerParameters.numSlots();
-			int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
-			Resource capability = Resource.newInstance(containerMemorySizeMB, vcores);
-
-			resourceManagerClient.addContainerRequest(
-				new AMRMClient.ContainerRequest(capability, null, null, priority));
-		}
+		// Resource requirements for worker containers
+		int taskManagerSlots = taskManagerParameters.numSlots();
+		int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
+		return Resource.newInstance(containerMemorySizeMB, vcores);
+	}
 
-		// make sure we transmit the request fast and receive fast news of granted allocations
-		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+	@Nonnull
+	private AMRMClient.ContainerRequest createContainerRequest(Resource capability) {
+		return new AMRMClient.ContainerRequest(capability, null, null, RM_REQUEST_PRIORITY);
 	}
 
 	@Override
@@ -434,7 +446,14 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 		final int numRequired = getDesignatedWorkerPoolSize();
 		final int numRegistered = getNumberOfStartedTaskManagers();
 
+		final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
+		final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();
+
 		for (Container container : containers) {
+			if (numPendingContainerRequests > 0) {
+				numPendingContainerRequests -= 1;
+				resourceManagerClient.removeContainerRequest(pendingRequestsIterator.next());
+			}
 			numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
 			LOG.info("Received new container: {} - Remaining pending container requests: {}",
 				container.getId(), numPendingContainerRequests);
@@ -487,6 +506,24 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 		triggerCheckWorkers();
 	}
 
+	private Collection<AMRMClient.ContainerRequest> getPendingRequests() {
+		final List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests = resourceManagerClient.getMatchingRequests(RM_REQUEST_PRIORITY, ResourceRequest.ANY, getContainerResource());
+
+		final Collection<AMRMClient.ContainerRequest> result;
+
+		if (matchingRequests.isEmpty()) {
+			result = Collections.emptyList();
+		} else {
+			result = new ArrayList<>(matchingRequests.get(0));
+		}
+
+		Preconditions.checkState(
+			result.size() == numPendingContainerRequests,
+			"The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", result.size(), numPendingContainerRequests);
+
+		return result;
+	}
+
 	/**
 	 * Invoked when the ResourceManager informs of completed containers.
 	 * Called via an actor message by the callback from the ResourceManager client.
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index ead0ac0..7081657 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -53,14 +55,20 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -73,6 +81,7 @@ import java.util.concurrent.ConcurrentMap;
  */
 public class YarnResourceManager extends ResourceManager<YarnWorkerNode> implements AMRMClientAsync.CallbackHandler {
 
+	private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance(1);
 	/** The process environment variables. */
 	private final Map<String, String> env;
 
@@ -117,6 +126,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	private final Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
 
+	private final Resource resource;
+
 	public YarnResourceManager(
 			RpcService rpcService,
 			String resourceManagerEndpointId,
@@ -169,6 +180,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 		this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
 		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
+
+		this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 	}
 
 	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
@@ -290,13 +303,15 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	@Override
 	public void startNewWorker(ResourceProfile resourceProfile) {
-		// Priority for worker containers - priorities are intra-application
-		//TODO: set priority according to the resource allocated
-		Priority priority = Priority.newInstance(generatePriority(resourceProfile));
-		int mem = resourceProfile.getMemoryInMB() < 0 ? defaultTaskManagerMemoryMB : resourceProfile.getMemoryInMB();
-		int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : (int) resourceProfile.getCpuCores();
-		Resource capability = Resource.newInstance(mem, vcore);
-		requestYarnContainer(capability, priority);
+		Preconditions.checkArgument(
+			ResourceProfile.UNKNOWN.equals(resourceProfile),
+			"The YarnResourceManager does not support custom ResourceProfiles yet. It assumes that all containers have the same resources.");
+		requestYarnContainer();
+	}
+
+	@VisibleForTesting
+	Resource getContainerResource() {
+		return resource;
 	}
 
 	@Override
@@ -339,8 +354,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 					if (yarnWorkerNode != null) {
 						// Container completed unexpectedly ~> start a new one
-						final Container container = yarnWorkerNode.getContainer();
-						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
+						requestYarnContainer();
 					}
 					// Eagerly close the connection with task manager.
 					closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
@@ -352,6 +366,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	@Override
 	public void onContainersAllocated(List<Container> containers) {
 		runAsync(() -> {
+			final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
+			final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();
+
 			for (Container container : containers) {
 				log.info(
 					"Received new container: {} - Remaining pending container requests: {}",
@@ -359,7 +376,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					numPendingContainerRequests);
 
 				if (numPendingContainerRequests > 0) {
-					numPendingContainerRequests--;
+					removeContainerRequest(pendingRequestsIterator.next());
 
 					final String containerIdStr = container.getId().toString();
 					final ResourceID resourceId = new ResourceID(containerIdStr);
@@ -381,7 +398,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 						workerNodeMap.remove(resourceId);
 						resourceManagerClient.releaseAssignedContainer(container.getId());
 						// and ask for a new one
-						requestYarnContainer(container.getResource(), container.getPriority());
+						requestYarnContainer();
 					}
 				} else {
 					// return the excessive containers
@@ -398,6 +415,36 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		});
 	}
 
+	private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) {
+		numPendingContainerRequests--;
+
+		log.info("Removing container request {}. Pending container requests {}.", pendingContainerRequest, numPendingContainerRequests);
+
+		resourceManagerClient.removeContainerRequest(pendingContainerRequest);
+	}
+
+	private Collection<AMRMClient.ContainerRequest> getPendingRequests() {
+		final List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests = resourceManagerClient.getMatchingRequests(
+			RM_REQUEST_PRIORITY,
+			ResourceRequest.ANY,
+			getContainerResource());
+
+		final Collection<AMRMClient.ContainerRequest> matchingContainerRequests;
+
+		if (matchingRequests.isEmpty()) {
+			matchingContainerRequests = Collections.emptyList();
+		} else {
+			final Collection<AMRMClient.ContainerRequest> collection = matchingRequests.get(0);
+			matchingContainerRequests = new ArrayList<>(collection);
+		}
+
+		Preconditions.checkState(
+			matchingContainerRequests.size() == numPendingContainerRequests,
+			"The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", matchingContainerRequests.size(), numPendingContainerRequests);
+
+		return matchingContainerRequests;
+	}
+
 	@Override
 	public void onShutdownRequest() {
 		shutDown();
@@ -452,11 +499,11 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	/**
 	 * Request new container if pending containers cannot satisfies pending slot requests.
 	 */
-	private void requestYarnContainer(Resource resource, Priority priority) {
+	private void requestYarnContainer() {
 		int pendingSlotRequests = getNumberPendingSlotRequests();
 		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
 		if (pendingSlotRequests > pendingSlotAllocation) {
-			resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
+			resourceManagerClient.addContainerRequest(getContainerRequest());
 
 			// make sure we transmit the request fast and receive fast news of granted allocations
 			resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
@@ -469,6 +516,16 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		}
 	}
 
+	@Nonnull
+	@VisibleForTesting
+	AMRMClient.ContainerRequest getContainerRequest() {
+		return new AMRMClient.ContainerRequest(
+			getContainerResource(),
+			null,
+			null,
+			RM_REQUEST_PRIORITY);
+	}
+
 	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
 			throws Exception {
 		// init the ContainerLaunchContext
@@ -505,22 +562,4 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 				.put(ENV_FLINK_NODE_ID, host);
 		return taskExecutorLaunchContext;
 	}
-
-
-
-	/**
-	 * Generate priority by given resource profile.
-	 * Priority is only used for distinguishing request of different resource.
-	 * @param resourceProfile The resource profile of a request
-	 * @return The priority of this resource profile.
-	 */
-	private int generatePriority(ResourceProfile resourceProfile) {
-		if (resourcePriorities.containsKey(resourceProfile)) {
-			return resourcePriorities.get(resourceProfile);
-		} else {
-			int priority = resourcePriorities.size();
-			resourcePriorities.put(resourceProfile, priority);
-			return priority;
-		}
-	}
 }
diff --git a/flink-yarn/src/main/resources/log4j.properties b/flink-yarn/src/main/resources/log4j.properties
index b2ad0d3..e84cd49 100644
--- a/flink-yarn/src/main/resources/log4j.properties
+++ b/flink-yarn/src/main/resources/log4j.properties
@@ -17,7 +17,7 @@
 ################################################################################
 
 # Convenience file for local debugging of the JobManager/TaskManager.
-log4j.rootLogger=INFO, console
+log4j.rootLogger=OFF, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
index 10b2ce9..3489726 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -50,9 +52,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -69,7 +69,10 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -128,20 +131,6 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
 				containerList.add(mockContainer);
 			}
 
-			doAnswer(new Answer() {
-				int counter = 0;
-				@Override
-				public Object answer(InvocationOnMock invocation) throws Throwable {
-					if (counter < containerList.size()) {
-						callbackHandler.onContainersAllocated(
-							Collections.singletonList(
-								containerList.get(counter++)
-							));
-					}
-					return null;
-				}
-			}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
-
 			final CompletableFuture<AkkaActorGateway> resourceManagerFuture = new CompletableFuture<>();
 			final CompletableFuture<AkkaActorGateway> leaderGatewayFuture = new CompletableFuture<>();
 
@@ -159,8 +148,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
 				})
 				.when(nodeManagerClient)
 				.startContainer(
-					Matchers.any(Container.class),
-					Matchers.any(ContainerLaunchContext.class));
+					any(Container.class),
+					any(ContainerLaunchContext.class));
 
 			ActorRef resourceManager = null;
 			ActorRef leader1;
@@ -191,6 +180,9 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
 						nodeManagerClient
 					));
 
+				doReturn(Collections.singletonList(Collections.nCopies(numInitialTaskManagers, new AMRMClient.ContainerRequest(Resource.newInstance(1024 * 1024, 1), null, null, Priority.newInstance(0)))))
+					.when(resourceManagerClient).getMatchingRequests(any(Priority.class), anyString(), any(Resource.class));
+
 				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
 
 				final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID);
@@ -203,6 +195,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
 
 				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
 
+				callbackHandler.onContainersAllocated(containerList);
+
 				for (int i = 0; i < containerList.size(); i++) {
 					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
 				}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 58d297d..85e3d53 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -105,7 +105,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -233,7 +235,7 @@ public class YarnResourceManagerTest extends TestLogger {
 		final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
 
 		// domain objects for test purposes
-		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200);
+		final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN;
 
 		public ContainerId task = ContainerId.newInstance(
 				ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1);
@@ -357,6 +359,10 @@ public class YarnResourceManagerTest extends TestLogger {
 			when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
 			when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
 			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+			doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest())))
+				.when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), anyString(), any(Resource.class));
+
 			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
 			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
 			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
@@ -457,6 +463,9 @@ public class YarnResourceManagerTest extends TestLogger {
 					1),
 				1);
 
+			doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest())))
+				.when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), anyString(), any(Resource.class));
+
 			// Callback from YARN when container is allocated.
 			Container testingContainer = mock(Container.class);
 			when(testingContainer.getId()).thenReturn(testContainerId);
@@ -464,6 +473,7 @@ public class YarnResourceManagerTest extends TestLogger {
 			when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
 			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
 			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+			verify(mockResourceManagerClient).removeContainerRequest(any(AMRMClient.ContainerRequest.class));
 			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
 			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
 
diff --git a/flink-yarn/src/test/resources/log4j-test.properties b/flink-yarn/src/test/resources/log4j-test.properties
index 2226f68..5b1e4ed 100644
--- a/flink-yarn/src/test/resources/log4j-test.properties
+++ b/flink-yarn/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender