You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/02 21:49:54 UTC

[1/2] flink git commit: [FLINK-9567][yarn] Only restart containers if there are pending slot requests

Repository: flink
Updated Branches:
  refs/heads/master 5755a1345 -> a49587a6e


[FLINK-9567][yarn] Only restart containers if there are pending slot requests

The YarnResourceManager should only restart containers if it still has some pending
slot requests left. This solves the problem that upon restart of the YarnResourceManager
it can happen that one recovers containers from a previous attempt which are just about
to be completed (the completion was triggered by the previous attempt). These containers
should not be restarted because they are no longer needed.

This closes #6237.

[FLINK-9567][runtime][yarn] Fix the bug that Flink does not release Yarn container when onContainerCompleted callback method happened after full restart


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49156e8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49156e8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49156e8d

Branch: refs/heads/master
Commit: 49156e8d6c2cb223e350760ad69d1898d67dacab
Parents: 5755a13
Author: yangshimin <ya...@youzan.com>
Authored: Mon Jul 2 11:56:00 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 2 16:44:28 2018 +0200

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        |  8 +++
 .../slotmanager/SlotManager.java                |  2 +
 .../apache/flink/yarn/YarnResourceManager.java  | 12 +++-
 .../flink/yarn/YarnResourceManagerTest.java     | 59 ++++++++++++++++++++
 4 files changed, 80 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49156e8d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3ea5c2e..6b104ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1125,5 +1125,13 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			return CompletableFuture.completedFuture(null);
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  Resource Management
+	// ------------------------------------------------------------------------
+
+	protected int getNumberPendingSlotRequests() {
+		return slotManager.getNumberPendingSlotRequests();
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/49156e8d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index b2dbba8..d74979a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -168,6 +168,8 @@ public class SlotManager implements AutoCloseable {
 		}
 	}
 
+	public int getNumberPendingSlotRequests() {return pendingSlotRequests.size(); }
+
 	// ---------------------------------------------------------------------------------------------
 	// Component lifecycle methods
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/49156e8d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index c498634..ab031be 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -334,7 +334,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					if (yarnWorkerNode != null) {
 						// Container completed unexpectedly ~> start a new one
 						final Container container = yarnWorkerNode.getContainer();
-						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
+						internalRequestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
 						closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
 					}
 				}
@@ -510,4 +510,14 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		}
 	}
 
+	/**
+	 * Request new container if pending containers cannot satisfies pending slot requests.
+	 */
+	private void internalRequestYarnContainer(Resource resource, Priority priority) {
+		int pendingSlotRequests = getNumberPendingSlotRequests();
+		int pendingSlotAllocation = numPendingContainerRequests * defaultNumSlots;
+		if (pendingSlotRequests > pendingSlotAllocation) {
+			requestYarnContainer(resource, priority);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/49156e8d/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 4fffc2b..8c6d7f7 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -103,6 +105,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -206,6 +209,7 @@ public class YarnResourceManagerTest extends TestLogger {
 		protected void runAsync(final Runnable runnable) {
 			runnable.run();
 		}
+
 	}
 
 	class Context {
@@ -421,4 +425,59 @@ public class YarnResourceManagerTest extends TestLogger {
 			assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
 		}};
 	}
+
+	/**
+	 * Tests that YarnResourceManager will not request more containers than needs during
+	 * callback from Yarn when container is Completed.
+	 * @throws Exception
+	 */
+	@Test
+	public void testOnContainerCompleted() throws Exception {
+		new Context() {{
+			startResourceManager();
+			CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+				rmServices.slotManager.registerSlotRequest(
+					new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+				return null;
+			});
+
+			// wait for the registerSlotRequest completion
+			registerSlotRequestFuture.get();
+
+			ContainerId testContainerId = ContainerId.newInstance(
+				ApplicationAttemptId.newInstance(
+					ApplicationId.newInstance(System.currentTimeMillis(), 1),
+					1),
+				1);
+
+			// Callback from YARN when container is allocated.
+			Container testingContainer = mock(Container.class);
+			when(testingContainer.getId()).thenReturn(testContainerId);
+			when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
+			when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
+
+			// Callback from YARN when container is Completed, pending request can not be fulfilled by pending
+			// containers, need to request new container.
+			ContainerStatus testingContainerStatus = mock(ContainerStatus.class);
+			when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
+			when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
+			when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
+			when(testingContainerStatus.getExitStatus()).thenReturn(-1);
+			resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+			verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
+			// Callback from YARN when container is Completed happened before global fail, pending request
+			// slot is already fulfilled by pending containers, no need to request new container.
+			when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
+			when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
+			when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
+			when(testingContainerStatus.getExitStatus()).thenReturn(-1);
+			resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+			verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+		}};
+	}
 }


[2/2] flink git commit: [hotfix][yarn] Extract number of task slots once from configuration

Posted by tr...@apache.org.
[hotfix][yarn] Extract number of task slots once from configuration

Let the YarnResourceManager only extract the number of task slots once from the
provided configuration.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a49587a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a49587a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a49587a6

Branch: refs/heads/master
Commit: a49587a6e9e1500e55e6bad8510791f5ec01c216
Parents: 49156e8
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 2 16:46:14 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 2 23:48:42 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/yarn/YarnResourceManager.java    | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a49587a6/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index ab031be..572e6ba 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -97,9 +97,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	@Nullable
 	private final String webInterfaceUrl;
 
-	private final int defaultTaskManagerMemoryMB;
+	private final int numberOfTaskSlots;
 
-	private final int defaultNumSlots;
+	private final int defaultTaskManagerMemoryMB;
 
 	private final int defaultCpus;
 
@@ -161,9 +161,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		numPendingContainerRequests = 0;
 
 		this.webInterfaceUrl = webInterfaceUrl;
+		this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 		this.defaultTaskManagerMemoryMB = flinkConfig.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
-		this.defaultNumSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
-		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, defaultNumSlots);
+		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
 	}
 
 	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
@@ -460,10 +460,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		// init the ContainerLaunchContext
 		final String currDir = env.get(ApplicationConstants.Environment.PWD.key());
 
-		final int numSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
-
 		final ContaineredTaskManagerParameters taskManagerParameters =
-				ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numSlots);
+				ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);
 
 		log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
 				"JVM direct memory limit {} MB",
@@ -515,7 +513,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	 */
 	private void internalRequestYarnContainer(Resource resource, Priority priority) {
 		int pendingSlotRequests = getNumberPendingSlotRequests();
-		int pendingSlotAllocation = numPendingContainerRequests * defaultNumSlots;
+		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
 		if (pendingSlotRequests > pendingSlotAllocation) {
 			requestYarnContainer(resource, priority);
 		}