You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/26 10:45:29 UTC
[flink] branch release-1.11 updated: [FLINK-17917][yarn] Ignore the
external resource with a value of 0 in constructing
InternalContainerResource
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 4181bb4 [FLINK-17917][yarn] Ignore the external resource with a value of 0 in constructing InternalContainerResource
4181bb4 is described below
commit 4181bb4ddb435d457a9c1c61074c131da2a96238
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Mon May 25 14:24:59 2020 +0800
[FLINK-17917][yarn] Ignore the external resource with a value of 0 in constructing InternalContainerResource
This closes #12315.
---
.../yarn/WorkerSpecContainerResourceAdapter.java | 12 +++++++++---
.../WorkerSpecContainerResourceAdapterTest.java | 21 +++++++++++++++++++++
2 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
index ad48a47..d563d92 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -166,15 +167,20 @@ class WorkerSpecContainerResourceAdapter {
* This class is for {@link WorkerSpecContainerResourceAdapter} internal usages only, to overcome the problem that
* hash codes are calculated inconsistently across different {@link Resource} implementations.
*/
- private static final class InternalContainerResource {
+ @VisibleForTesting
+ static final class InternalContainerResource {
private final int memory;
private final int vcores;
private final Map<String, Long> externalResources;
- private InternalContainerResource(final int memory, final int vcores, final Map<String, Long> externalResources) {
+ @VisibleForTesting
+ InternalContainerResource(final int memory, final int vcores, final Map<String, Long> externalResources) {
this.memory = memory;
this.vcores = vcores;
- this.externalResources = externalResources;
+ this.externalResources = externalResources.entrySet()
+ .stream()
+ .filter(entry -> !entry.getValue().equals(0L))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private InternalContainerResource(final Resource resource) {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
index 771165a..1f466f4 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
@@ -29,12 +29,15 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.junit.Test;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -238,6 +241,24 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
assertThat(adapter.getWorkerSpecs(resourceImpl2, strategy), contains(workerSpec));
}
+ @Test
+ public void testMatchInternalContainerResourceIgnoresZeroValueExternalResources() {
+ final Map<String, Long> externalResources1 = new HashMap<>();
+ final Map<String, Long> externalResources2 = new HashMap<>();
+
+ externalResources1.put("foo", 0L);
+ externalResources1.put("bar", 1L);
+ externalResources2.put("zoo", 0L);
+ externalResources2.put("bar", 1L);
+
+ final WorkerSpecContainerResourceAdapter.InternalContainerResource internalContainerResource1 =
+ new WorkerSpecContainerResourceAdapter.InternalContainerResource(1024, 1, externalResources1);
+ final WorkerSpecContainerResourceAdapter.InternalContainerResource internalContainerResource2 =
+ new WorkerSpecContainerResourceAdapter.InternalContainerResource(1024, 1, externalResources2);
+
+ assertEquals(internalContainerResource1, internalContainerResource2);
+ }
+
private Configuration getConfigProcessSpecEqualsWorkerSpec() {
final Configuration config = new Configuration();
config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, MemorySize.ZERO);