You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/14 03:54:48 UTC

[GitHub] [flink] KarmaGYZ commented on a change in pull request #13592: [FLINK-19324][yarn] Map requested and allocated containers with priority on YARN

KarmaGYZ commented on a change in pull request #13592:
URL: https://github.com/apache/flink/pull/13592#discussion_r504365654



##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Utility class for converting between Flink {@link TaskExecutorProcessSpec} and Yarn {@link Resource} and {@link Priority}.
+ */
+public class TaskExecutorProcessSpecContainerResourcePriorityAdapter {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorProcessSpecContainerResourcePriorityAdapter.class);
+
+	private final Map<TaskExecutorProcessSpec, Resource> taskExecutorProcessSpecToResource;
+	private final Map<TaskExecutorProcessSpec, Priority> taskExecutorProcessSpecToPriority;

Review comment:
       We could merge these two maps. They should always have the same keyset by design.

##########
File path: flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorProcessSpecContainerResourcePriorityAdapter}.
+ */
+public class TaskExecutorProcessSpecContainerResourcePriorityAdapterTest extends TestLogger {
+
+	private static final Resource MAX_CONTAINER_RESOURCE = Resource.newInstance(102400, 100);

Review comment:
       I think we could add some tests for external resources. To be specific:
   - Check could we construct `TaskExecutorProcessSpecContainerResourcePriorityAdapter` if the given external resource is not supported by the Yarn cluster.
   - Under Hadoop 3.0+ or 2.10+, using `assumeTrue(HadoopUtils.isMinHadoopVersion(2, 10))`, set the external resource to `MAX_CONTAINER_RESOURCE` and add `testGetTaskExecutorProcessSpecWithExternalResource`.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -260,36 +257,47 @@ public void releaseResource(YarnWorkerNode workerNode) {
 	//  Internal
 	// ------------------------------------------------------------------------
 
-	private void onContainersOfResourceAllocated(Resource resource, List<Container> containers) {
-		final List<TaskExecutorProcessSpec> pendingTaskExecutorProcessSpecs =
-			taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource, matchingStrategy).stream()
-				.flatMap(spec -> Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
-				.collect(Collectors.toList());
+	private void onContainersOfPriorityAllocated(Priority priority, List<Container> containers) {
+		final Optional<TaskExecutorProcessSpec> taskExecutorProcessSpecOpt =
+			taskExecutorProcessSpecContainerResourcePriorityAdapter.getTaskExecutorProcessSpec(priority);
+
+		if (!taskExecutorProcessSpecOpt.isPresent()) {
+			log.warn("Receive {} containers with unrecognized priority {}. This should not happen.",
+				containers.size(), priority.getPriority());
+			for (Container container : containers) {
+				returnExcessContainer(container);
+			}
+			return;
+		}
 
-		int numPending = pendingTaskExecutorProcessSpecs.size();
-		log.info("Received {} containers with resource {}, {} pending container requests.",
+		final TaskExecutorProcessSpec taskExecutorProcessSpec = taskExecutorProcessSpecOpt.get();
+
+		final Optional<Resource> resourceOpt = taskExecutorProcessSpecContainerResourcePriorityAdapter.getResource(taskExecutorProcessSpec);
+		Preconditions.checkState(resourceOpt.isPresent());

Review comment:
       We could also make the value of `priorityToTaskExecutorProcessSpec` to a `Tuple2` of `TaskExecutorProcessSpec` and `Resource`.

##########
File path: flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorProcessSpecContainerResourcePriorityAdapter}.
+ */
+public class TaskExecutorProcessSpecContainerResourcePriorityAdapterTest extends TestLogger {
+
+	private static final Resource MAX_CONTAINER_RESOURCE = Resource.newInstance(102400, 100);
+
+	private static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC_1 = new TaskExecutorProcessSpec(
+		new CPUResource(1.0),
+		MemorySize.ofMebiBytes(100),
+		MemorySize.ofMebiBytes(100),
+		MemorySize.ofMebiBytes(100),
+		MemorySize.ofMebiBytes(100),
+		MemorySize.ofMebiBytes(100),
+		MemorySize.ofMebiBytes(100),
+		MemorySize.ofMebiBytes(100),
+		MemorySize.ofMebiBytes(100));
+
+	private static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC_2 = new TaskExecutorProcessSpec(
+		new CPUResource(2.0),
+		MemorySize.ofMebiBytes(200),
+		MemorySize.ofMebiBytes(200),
+		MemorySize.ofMebiBytes(200),
+		MemorySize.ofMebiBytes(200),
+		MemorySize.ofMebiBytes(200),
+		MemorySize.ofMebiBytes(200),
+		MemorySize.ofMebiBytes(200),
+		MemorySize.ofMebiBytes(200));
+
+	@Test
+	public void testGetResource() {
+		final TaskExecutorProcessSpecContainerResourcePriorityAdapter adapter = getAdapter();
+		final Resource resource = adapter.getResource(TASK_EXECUTOR_PROCESS_SPEC_1).get();
+		assertThat(resource.getMemory(), is(TASK_EXECUTOR_PROCESS_SPEC_1.getTotalProcessMemorySize().getMebiBytes()));
+		assertThat(resource.getVirtualCores(), is(TASK_EXECUTOR_PROCESS_SPEC_1.getCpuCores().getValue().intValue()));
+	}
+
+	@Test
+	public void testGetPriority() {
+		final TaskExecutorProcessSpecContainerResourcePriorityAdapter adapter = getAdapter();
+		final Priority priority1 = adapter.getPriority(TASK_EXECUTOR_PROCESS_SPEC_1).get();
+		final Priority priority2 = adapter.getPriority(TASK_EXECUTOR_PROCESS_SPEC_2).get();
+		final Priority priority3 = adapter.getPriority(TASK_EXECUTOR_PROCESS_SPEC_1).get();
+		assertThat(priority1, not(priority2));
+		assertThat(priority1, is(priority3));
+	}
+
+	@Test
+	public void testMaxContainerResource() {
+		final TaskExecutorProcessSpecContainerResourcePriorityAdapter adapter =
+			new TaskExecutorProcessSpecContainerResourcePriorityAdapter(
+				1,
+				Resource.newInstance(100, 1),
+				Collections.emptyMap());
+		assertThat(adapter.getResource(TASK_EXECUTOR_PROCESS_SPEC_2).isPresent(), is(false));
+		assertThat(adapter.getPriority(TASK_EXECUTOR_PROCESS_SPEC_2).isPresent(), is(false));

Review comment:
       It seems we could also use `getAdapter` here. We just need a `TASK_EXECUTOR_PROCESS_SPEC_3` with `CPUResource` 300.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -349,32 +356,21 @@ private void startTaskExecutorInContainerAsync(
 			}, getMainThreadExecutor()));
 	}
 
-	private Collection<AMRMClient.ContainerRequest> getPendingRequestsAndCheckConsistency(Resource resource, int expectedNum) {
-		final Collection<Resource> equivalentResources = taskExecutorProcessSpecContainerResourceAdapter.getEquivalentContainerResource(resource, matchingStrategy);
-		final List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests =
-			equivalentResources.stream()
-				.flatMap(equivalentResource -> resourceManagerClient.getMatchingRequests(
-					RM_REQUEST_PRIORITY,
-					ResourceRequest.ANY,
-					equivalentResource).stream())
+	private Collection<AMRMClient.ContainerRequest> getPendingRequestsAndCheckConsistency(
+			Priority priority, Resource resource, int expectedNum) {
+		final List<AMRMClient.ContainerRequest> matchingRequests =
+			resourceManagerClient.getMatchingRequests(priority, ResourceRequest.ANY, resource)
+				.stream()
+				.flatMap(requests -> requests.stream())

Review comment:
       ```suggestion
   				.flatMap(Collection::stream)
   ```

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -260,36 +257,47 @@ public void releaseResource(YarnWorkerNode workerNode) {
 	//  Internal
 	// ------------------------------------------------------------------------
 
-	private void onContainersOfResourceAllocated(Resource resource, List<Container> containers) {
-		final List<TaskExecutorProcessSpec> pendingTaskExecutorProcessSpecs =
-			taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource, matchingStrategy).stream()
-				.flatMap(spec -> Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
-				.collect(Collectors.toList());
+	private void onContainersOfPriorityAllocated(Priority priority, List<Container> containers) {
+		final Optional<TaskExecutorProcessSpec> taskExecutorProcessSpecOpt =
+			taskExecutorProcessSpecContainerResourcePriorityAdapter.getTaskExecutorProcessSpec(priority);
+
+		if (!taskExecutorProcessSpecOpt.isPresent()) {
+			log.warn("Receive {} containers with unrecognized priority {}. This should not happen.",
+				containers.size(), priority.getPriority());
+			for (Container container : containers) {
+				returnExcessContainer(container);
+			}
+			return;
+		}
 
-		int numPending = pendingTaskExecutorProcessSpecs.size();
-		log.info("Received {} containers with resource {}, {} pending container requests.",
+		final TaskExecutorProcessSpec taskExecutorProcessSpec = taskExecutorProcessSpecOpt.get();
+
+		final Optional<Resource> resourceOpt = taskExecutorProcessSpecContainerResourcePriorityAdapter.getResource(taskExecutorProcessSpec);
+		Preconditions.checkState(resourceOpt.isPresent());

Review comment:
       If the taskExecutorProcessSpecOpt is present, the resourceOpt should be present by design. I think we may have `taskExecutorProcessSpecContainerResourcePriorityAdapter#getTaskExecutorProcessSpecAndResource`, directly return an optional `Tuple2`. So, we do not need to call `taskExecutorProcessSpecContainerResourcePriorityAdapter.getResource(taskExecutorProcessSpec)` and check whether resourceOpt is present.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org