You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/02/07 17:13:45 UTC
[flink] 01/03: [FLINK-15942] Do not log huge/infinite cpu/memory
resources in profiles
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 67e1052f045fa0617ed672ff45ed91af7379d6ed
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Fri Feb 7 16:39:13 2020 +0300
[FLINK-15942] Do not log huge/infinite cpu/memory resources in profiles
---
.../clusterframework/types/ResourceProfile.java | 37 ++++++++++++++++------
.../types/ResourceProfileTest.java | 37 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 10 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index b77b00a..8a68728 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -78,6 +78,12 @@ public class ResourceProfile implements Serializable {
/** A ResourceProfile describing zero resources. */
public static final ResourceProfile ZERO = newBuilder().build();
+ /** Maximum number of cpu cores to output in {@link #toString()}. */
+ static final BigDecimal MAX_CPU_CORE_NUMBER_TO_LOG = new BigDecimal(16384);
+
+ /** Maximum memory resource size to output in {@link #toString()}. */
+ static final MemorySize MAX_MEMORY_SIZE_TO_LOG = new MemorySize(1L << 50); // 1Pb
+
// ------------------------------------------------------------------------
/** How many cpu cores are needed. Can be null only if it is unknown. */
@@ -388,17 +394,28 @@ public class ResourceProfile implements Serializable {
return "ResourceProfile{ANY}";
}
- final StringBuilder resources = new StringBuilder(extendedResources.size() * 10);
+ final StringBuilder extendedResourceStr = new StringBuilder(extendedResources.size() * 10);
for (Map.Entry<String, Resource> resource : extendedResources.entrySet()) {
- resources.append(", ").append(resource.getKey()).append('=').append(resource.getValue().getValue());
- }
- return "ResourceProfile{" +
- "cpuCores=" + cpuCores.getValue() +
- ", taskHeapMemory=" + taskHeapMemory.toHumanReadableString() +
- ", taskOffHeapMemory=" + taskOffHeapMemory.toHumanReadableString() +
- ", managedMemory=" + managedMemory.toHumanReadableString() +
- ", networkMemory=" + networkMemory.toHumanReadableString() + resources +
- '}';
+ extendedResourceStr.append(", ").append(resource.getKey()).append('=').append(resource.getValue().getValue());
+ }
+ return "ResourceProfile{" + getResourceString() + extendedResourceStr + '}';
+ }
+
+ private String getResourceString() {
+ String resourceStr = cpuCores == null || cpuCores.getValue().compareTo(MAX_CPU_CORE_NUMBER_TO_LOG) > 0 ?
+ "" : "cpuCores=" + cpuCores.getValue();
+ resourceStr = addMemorySizeString(resourceStr, "taskHeapMemory", taskHeapMemory);
+ resourceStr = addMemorySizeString(resourceStr, "taskOffHeapMemory", taskOffHeapMemory);
+ resourceStr = addMemorySizeString(resourceStr, "managedMemory", managedMemory);
+ resourceStr = addMemorySizeString(resourceStr, "networkMemory", networkMemory);
+ return resourceStr;
+ }
+
+ private static String addMemorySizeString(String resourceStr, String name, MemorySize size) {
+ String comma = resourceStr.isEmpty() ? "" : ", ";
+ String memorySizeStr = size == null || size.compareTo(MAX_MEMORY_SIZE_TO_LOG) > 0 ?
+ "" : comma + name + '=' + size.toHumanReadableString();
+ return resourceStr + memorySizeStr;
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 1709fd6..6693473 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -29,10 +29,14 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.flink.runtime.clusterframework.types.ResourceProfile.MAX_CPU_CORE_NUMBER_TO_LOG;
+import static org.apache.flink.runtime.clusterframework.types.ResourceProfile.MAX_MEMORY_SIZE_TO_LOG;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -40,6 +44,7 @@ import static org.junit.Assert.fail;
* Tests for the {@link ResourceProfile}.
*/
public class ResourceProfileTest {
+ private static final MemorySize NOT_LOGGED_MEMORY = MAX_MEMORY_SIZE_TO_LOG.add(MemorySize.ofMebiBytes(10));
@Test
public void testMatchRequirement() {
@@ -387,4 +392,36 @@ public class ResourceProfileTest {
assertSame(ResourceProfile.ANY, copiedProfile);
}
+
+ @Test
+ public void testToStringWithoutCpu() {
+ double notLoggedCpu = MAX_CPU_CORE_NUMBER_TO_LOG.doubleValue() + 1.0;
+ ResourceProfile resourceProfile = getResourceProfileForLogsWithCpuVal(notLoggedCpu);
+ assertThat(resourceProfile.toString(), is(getExpectedToString(resourceProfile, "")));
+ }
+
+ @Test
+ public void testToStringWithCpu() {
+ ResourceProfile resourceProfile = getResourceProfileForLogsWithCpuVal(1.0);
+ assertThat(resourceProfile.toString(), is(getExpectedToString(resourceProfile, "cpuCores=1.0, ")));
+ }
+
+ private static ResourceProfile getResourceProfileForLogsWithCpuVal(double cpu) {
+ return ResourceProfile
+ .newBuilder()
+ .setCpuCores(cpu)
+ .setTaskHeapMemoryMB(2)
+ .setTaskOffHeapMemory(NOT_LOGGED_MEMORY)
+ .setManagedMemoryMB(4)
+ .setNetworkMemory(NOT_LOGGED_MEMORY)
+ .build();
+ }
+
+ private static String getExpectedToString(ResourceProfile resourceProfile, String expectedCpuStr) {
+ return String.format(
+ "ResourceProfile{%staskHeapMemory=%s, managedMemory=%s}",
+ expectedCpuStr,
+ resourceProfile.getTaskHeapMemory().toHumanReadableString(),
+ resourceProfile.getManagedMemory().toHumanReadableString());
+ }
}