You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/02/07 17:13:45 UTC

[flink] 01/03: [FLINK-15942] Do not log huge/infinite cpu/memory resources in profiles

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 67e1052f045fa0617ed672ff45ed91af7379d6ed
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Fri Feb 7 16:39:13 2020 +0300

    [FLINK-15942] Do not log huge/infinite cpu/memory resources in profiles
---
 .../clusterframework/types/ResourceProfile.java    | 37 ++++++++++++++++------
 .../types/ResourceProfileTest.java                 | 37 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 10 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index b77b00a..8a68728 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -78,6 +78,12 @@ public class ResourceProfile implements Serializable {
 	/** A ResourceProfile describing zero resources. */
 	public static final ResourceProfile ZERO = newBuilder().build();
 
+	/** Maximum number of cpu cores to output in {@link #toString()}. */
+	static final BigDecimal MAX_CPU_CORE_NUMBER_TO_LOG = new BigDecimal(16384);
+
+	/** Maximum memory resource size to output in {@link #toString()}. */
+	static final MemorySize MAX_MEMORY_SIZE_TO_LOG = new MemorySize(1L << 50); // 1Pb
+
 	// ------------------------------------------------------------------------
 
 	/** How many cpu cores are needed. Can be null only if it is unknown. */
@@ -388,17 +394,28 @@ public class ResourceProfile implements Serializable {
 			return "ResourceProfile{ANY}";
 		}
 
-		final StringBuilder resources = new StringBuilder(extendedResources.size() * 10);
+		final StringBuilder extendedResourceStr = new StringBuilder(extendedResources.size() * 10);
 		for (Map.Entry<String, Resource> resource : extendedResources.entrySet()) {
-			resources.append(", ").append(resource.getKey()).append('=').append(resource.getValue().getValue());
-		}
-		return "ResourceProfile{" +
-			"cpuCores=" + cpuCores.getValue() +
-			", taskHeapMemory=" + taskHeapMemory.toHumanReadableString() +
-			", taskOffHeapMemory=" + taskOffHeapMemory.toHumanReadableString() +
-			", managedMemory=" + managedMemory.toHumanReadableString() +
-			", networkMemory=" + networkMemory.toHumanReadableString() + resources +
-			'}';
+			extendedResourceStr.append(", ").append(resource.getKey()).append('=').append(resource.getValue().getValue());
+		}
+		return "ResourceProfile{" + getResourceString() + extendedResourceStr + '}';
+	}
+
+	private String getResourceString() {
+		String resourceStr = cpuCores == null || cpuCores.getValue().compareTo(MAX_CPU_CORE_NUMBER_TO_LOG) > 0 ?
+			"" : "cpuCores=" + cpuCores.getValue();
+		resourceStr = addMemorySizeString(resourceStr, "taskHeapMemory", taskHeapMemory);
+		resourceStr = addMemorySizeString(resourceStr, "taskOffHeapMemory", taskOffHeapMemory);
+		resourceStr = addMemorySizeString(resourceStr, "managedMemory", managedMemory);
+		resourceStr = addMemorySizeString(resourceStr, "networkMemory", networkMemory);
+		return resourceStr;
+	}
+
+	private static String addMemorySizeString(String resourceStr, String name, MemorySize size) {
+		String comma = resourceStr.isEmpty() ? "" : ", ";
+		String memorySizeStr = size == null || size.compareTo(MAX_MEMORY_SIZE_TO_LOG) > 0 ?
+			"" : comma + name + '=' + size.toHumanReadableString();
+		return resourceStr + memorySizeStr;
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 1709fd6..6693473 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -29,10 +29,14 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.runtime.clusterframework.types.ResourceProfile.MAX_CPU_CORE_NUMBER_TO_LOG;
+import static org.apache.flink.runtime.clusterframework.types.ResourceProfile.MAX_MEMORY_SIZE_TO_LOG;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -40,6 +44,7 @@ import static org.junit.Assert.fail;
  * Tests for the {@link ResourceProfile}.
  */
 public class ResourceProfileTest {
+	private static final MemorySize NOT_LOGGED_MEMORY = MAX_MEMORY_SIZE_TO_LOG.add(MemorySize.ofMebiBytes(10));
 
 	@Test
 	public void testMatchRequirement() {
@@ -387,4 +392,36 @@ public class ResourceProfileTest {
 
 		assertSame(ResourceProfile.ANY, copiedProfile);
 	}
+
+	@Test
+	public void testToStringWithoutCpu() {
+		double notLoggedCpu = MAX_CPU_CORE_NUMBER_TO_LOG.doubleValue() + 1.0;
+		ResourceProfile resourceProfile = getResourceProfileForLogsWithCpuVal(notLoggedCpu);
+		assertThat(resourceProfile.toString(), is(getExpectedToString(resourceProfile, "")));
+	}
+
+	@Test
+	public void testToStringWithCpu() {
+		ResourceProfile resourceProfile = getResourceProfileForLogsWithCpuVal(1.0);
+		assertThat(resourceProfile.toString(), is(getExpectedToString(resourceProfile, "cpuCores=1.0, ")));
+	}
+
+	private static ResourceProfile getResourceProfileForLogsWithCpuVal(double cpu) {
+		return ResourceProfile
+			.newBuilder()
+			.setCpuCores(cpu)
+			.setTaskHeapMemoryMB(2)
+			.setTaskOffHeapMemory(NOT_LOGGED_MEMORY)
+			.setManagedMemoryMB(4)
+			.setNetworkMemory(NOT_LOGGED_MEMORY)
+			.build();
+	}
+
+	private static String getExpectedToString(ResourceProfile resourceProfile, String expectedCpuStr) {
+		return String.format(
+			"ResourceProfile{%staskHeapMemory=%s, managedMemory=%s}",
+			expectedCpuStr,
+			resourceProfile.getTaskHeapMemory().toHumanReadableString(),
+			resourceProfile.getManagedMemory().toHumanReadableString());
+	}
 }