You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/14 13:31:45 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13592: [FLINK-19324][yarn] Map requested and allocated containers with priority on YARN

tillrohrmann commented on a change in pull request #13592:
URL: https://github.com/apache/flink/pull/13592#discussion_r504657264



##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -260,36 +259,44 @@ public void releaseResource(YarnWorkerNode workerNode) {
 	//  Internal
 	// ------------------------------------------------------------------------
 
-	private void onContainersOfResourceAllocated(Resource resource, List<Container> containers) {
-		final List<TaskExecutorProcessSpec> pendingTaskExecutorProcessSpecs =
-			taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource, matchingStrategy).stream()
-				.flatMap(spec -> Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
-				.collect(Collectors.toList());
+	private void onContainersOfPriorityAllocated(Priority priority, List<Container> containers) {
+		final Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.TaskExecutorProcessSpecAndResource> taskExecutorProcessSpecAndResourceOpt =
+			taskExecutorProcessSpecContainerResourcePriorityAdapter.getTaskExecutorProcessSpecAndResource(priority);
+
+		if (!taskExecutorProcessSpecAndResourceOpt.isPresent()) {
+			log.warn("Receive {} containers with unrecognized priority {}. This should not happen.",
+				containers.size(), priority.getPriority());
+			for (Container container : containers) {
+				returnExcessContainer(container);
+			}
+			return;
+		}

Review comment:
       Are we not failing hard here because this can happen in case of a JobManager failover?

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -442,38 +444,25 @@ private RegisterApplicationMasterResponse registerApplicationMaster() throws Exc
 		return resourceManagerClient.registerApplicationMaster(rpcAddress, restPort, webInterfaceUrl);
 	}
 
-	private void getContainersFromPreviousAttempts(final RegisterApplicationMasterResponse registerApplicationMasterResponse) {
+	private int getContainersFromPreviousAttempts(final RegisterApplicationMasterResponse registerApplicationMasterResponse) {
 		final List<Container> containersFromPreviousAttempts =
 			registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
 		final List<YarnWorkerNode> recoveredWorkers = new ArrayList<>();
 
 		log.info("Recovered {} containers from previous attempts ({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
 
+		int maxPriority = 0;
 		for (Container container : containersFromPreviousAttempts) {
 			final YarnWorkerNode worker = new YarnWorkerNode(container, getContainerResourceId(container));
 			recoveredWorkers.add(worker);
+			maxPriority = Math.max(container.getPriority().getPriority(), maxPriority);

Review comment:
       I guess there is no easy way to calculate the `TaskExecutorProcessSpec` from a `Container` to restore the old priority to `TaskExecutorProcessSpec` mapping?

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -170,8 +159,12 @@ protected void initializeInternal() throws Exception {
 			resourceManagerClient.start();
 
 			final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
-			getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-			updateMatchingStrategy(registerApplicationMasterResponse);
+			int maxPriority = getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+			taskExecutorProcessSpecContainerResourcePriorityAdapter =
+				new TaskExecutorProcessSpecContainerResourcePriorityAdapter(
+					maxPriority + 1,

Review comment:
       Will this be a problem that we continue increasing the priority value? Potentially, this can grow arbitrarily large.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -442,38 +444,25 @@ private RegisterApplicationMasterResponse registerApplicationMaster() throws Exc
 		return resourceManagerClient.registerApplicationMaster(rpcAddress, restPort, webInterfaceUrl);
 	}
 
-	private void getContainersFromPreviousAttempts(final RegisterApplicationMasterResponse registerApplicationMasterResponse) {
+	private int getContainersFromPreviousAttempts(final RegisterApplicationMasterResponse registerApplicationMasterResponse) {
 		final List<Container> containersFromPreviousAttempts =
 			registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
 		final List<YarnWorkerNode> recoveredWorkers = new ArrayList<>();
 
 		log.info("Recovered {} containers from previous attempts ({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
 
+		int maxPriority = 0;
 		for (Container container : containersFromPreviousAttempts) {
 			final YarnWorkerNode worker = new YarnWorkerNode(container, getContainerResourceId(container));
 			recoveredWorkers.add(worker);
+			maxPriority = Math.max(container.getPriority().getPriority(), maxPriority);

Review comment:
       Would it be an option to say that the default `TaskExecutorProcessSpec` always has the priority `1`? That way, the behaviour wrt priorities would not change for the current Flink version.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -228,21 +221,27 @@ public void deregisterApplication(ApplicationStatus finalStatus, @Nullable Strin
 
 	@Override
 	public CompletableFuture<YarnWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
-		final Optional<Resource> containerResourceOptional = getContainerResource(taskExecutorProcessSpec);
+		checkInitialized();
+
 		final CompletableFuture<YarnWorkerNode> requestResourceFuture = new CompletableFuture<>();
 
-		if (containerResourceOptional.isPresent()) {
-			resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));
+		final Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.PriorityAndResource> priorityAndResourceOpt =
+			taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
+
+		if (!priorityAndResourceOpt.isPresent()) {
+			requestResourceFuture.completeExceptionally(
+				new ResourceManagerException(String.format("Could not compute the container Resource from the given TaskExecutorProcessSpec %s.", taskExecutorProcessSpec)));

Review comment:
       Would a bit more details be helpful here? Maybe why we could not compute the container resource?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org