You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/26 10:45:29 UTC

[flink] branch release-1.11 updated: [FLINK-17917][yarn] Ignore the external resource with a value of 0 in constructing InternalContainerResource

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 4181bb4  [FLINK-17917][yarn] Ignore the external resource with a value of 0 in constructing InternalContainerResource
4181bb4 is described below

commit 4181bb4ddb435d457a9c1c61074c131da2a96238
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Mon May 25 14:24:59 2020 +0800

    [FLINK-17917][yarn] Ignore the external resource with a value of 0 in constructing InternalContainerResource
    
    This closes #12315.
---
 .../yarn/WorkerSpecContainerResourceAdapter.java    | 12 +++++++++---
 .../WorkerSpecContainerResourceAdapterTest.java     | 21 +++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
index ad48a47..d563d92 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -166,15 +167,20 @@ class WorkerSpecContainerResourceAdapter {
 	 * This class is for {@link WorkerSpecContainerResourceAdapter} internal usages only, to overcome the problem that
 	 * hash codes are calculated inconsistently across different {@link Resource} implementations.
 	 */
-	private static final class InternalContainerResource {
+	@VisibleForTesting
+	static final class InternalContainerResource {
 		private final int memory;
 		private final int vcores;
 		private final Map<String, Long> externalResources;
 
-		private InternalContainerResource(final int memory, final int vcores, final Map<String, Long> externalResources) {
+		@VisibleForTesting
+		InternalContainerResource(final int memory, final int vcores, final Map<String, Long> externalResources) {
 			this.memory = memory;
 			this.vcores = vcores;
-			this.externalResources = externalResources;
+			this.externalResources = externalResources.entrySet()
+						.stream()
+						.filter(entry -> !entry.getValue().equals(0L))
+						.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 		}
 
 		private InternalContainerResource(final Resource resource) {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
index 771165a..1f466f4 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
@@ -29,12 +29,15 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -238,6 +241,24 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 		assertThat(adapter.getWorkerSpecs(resourceImpl2, strategy), contains(workerSpec));
 	}
 
+	@Test
+	public void testMatchInternalContainerResourceIgnoresZeroValueExternalResources() {
+		final Map<String, Long> externalResources1 = new HashMap<>();
+		final Map<String, Long> externalResources2 = new HashMap<>();
+
+		externalResources1.put("foo", 0L);
+		externalResources1.put("bar", 1L);
+		externalResources2.put("zoo", 0L);
+		externalResources2.put("bar", 1L);
+
+		final WorkerSpecContainerResourceAdapter.InternalContainerResource internalContainerResource1 =
+			new WorkerSpecContainerResourceAdapter.InternalContainerResource(1024, 1, externalResources1);
+		final WorkerSpecContainerResourceAdapter.InternalContainerResource internalContainerResource2 =
+			new WorkerSpecContainerResourceAdapter.InternalContainerResource(1024, 1, externalResources2);
+
+		assertEquals(internalContainerResource1, internalContainerResource2);
+	}
+
 	private Configuration getConfigProcessSpecEqualsWorkerSpec() {
 		final Configuration config = new Configuration();
 		config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, MemorySize.ZERO);