You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/06 10:22:28 UTC

flink git commit: [hotfix] [tests] Speed up TaskManagerServicesTest

Repository: flink
Updated Branches:
  refs/heads/master f6e24ab60 -> 4f32a796f


[hotfix] [tests] Speed up TaskManagerServicesTest

The TaskManagerServicesTest was extremely slow because it uses the PowerMockRunner
for a single test case. Additionally some other tests test different settings in
a loop which was quite slow because the PowerMockRunner instrumented all method calls.
Moving the test which required the PowerMockRunner into a separate test class sped up
things by a factor of 50.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f32a796
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f32a796
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f32a796

Branch: refs/heads/master
Commit: 4f32a796f36555e46053a386b813e62cb886d273
Parents: f6e24ab
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 6 11:16:36 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 6 11:18:29 2017 +0100

----------------------------------------------------------------------
 .../NetworkBufferCalculationTest.java           | 131 +++++++++++++++++++
 .../taskexecutor/TaskManagerServicesTest.java   | 118 +++--------------
 2 files changed, 147 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4f32a796/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
new file mode 100644
index 0000000..eb879c4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests the network buffer calculation from heap size which requires mocking
+ * to have control over the {@link EnvironmentInformation}.
+ *
+ *
+ * <p>This should be refactored once we have pulled {@link EnvironmentInformation} out of
+ * {@link TaskManagerServices}. See FLINK-8212 for more information.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class NetworkBufferCalculationTest extends TestLogger {
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(TaskManagerServicesConfiguration)}
+	 * using the same (manual) test cases as in {@link TaskManagerServicesTest#calculateHeapSizeMB()}.
+	 */
+	@Test
+	public void calculateNetworkBufFromHeapSize() throws Exception {
+		PowerMockito.mockStatic(EnvironmentInformation.class);
+		// some defaults:
+		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB
+
+		TaskManagerServicesConfiguration tmConfig;
+
+		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
+			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+			0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(900L << 20); // 900MB
+		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+
+		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
+			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+			0.2f, 60L << 20, 1L << 30, MemoryType.HEAP);
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(800L << 20); // 800MB
+		assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
+			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+
+		tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB
+		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+
+		tmConfig = getTmConfig(-1, 0.1f,
+			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB
+		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+	}
+
+	/**
+	 * Returns a task manager services configuration for the tests
+	 *
+	 * @param managedMemory         see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
+	 * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
+	 * @param networkBufFraction	see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
+	 * @param networkBufMin			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}
+	 * @param networkBufMax			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
+	 * @param memType				on-heap or off-heap
+	 *
+	 * @return configuration object
+	 */
+	private static TaskManagerServicesConfiguration getTmConfig(
+		final long managedMemory, final float managedMemoryFraction, float networkBufFraction,
+		long networkBufMin, long networkBufMax,
+		final MemoryType memType) {
+
+		final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
+			networkBufFraction,
+			networkBufMin,
+			networkBufMax,
+			TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
+			null,
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
+			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
+			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(),
+			null);
+
+		return new TaskManagerServicesConfiguration(
+			InetAddress.getLoopbackAddress(),
+			new String[] {},
+			networkConfig,
+			QueryableStateConfiguration.disabled(),
+			1,
+			managedMemory,
+			memType,
+			false,
+			managedMemoryFraction,
+			0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4f32a796/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
index 1ec280d..f47608c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
@@ -20,30 +20,20 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.net.InetAddress;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.junit.Assert.fail;
 
 /**
  * Unit test for {@link TaskManagerServices}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(EnvironmentInformation.class)
-public class TaskManagerServicesTest {
+public class TaskManagerServicesTest extends TestLogger{
 
 	/**
 	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)} using old
@@ -120,14 +110,20 @@ public class TaskManagerServicesTest {
 			long javaMem = Math.max(max + 1, ran.nextLong());
 
 			final long networkBufMem = TaskManagerServices.calculateNetworkBufferMemory(javaMem, config);
-			assertTrue("Lower bound not met with configuration: " + config.toString(),
-				networkBufMem >= min);
-			assertTrue("Upper bound not met with configuration: " + config.toString(),
-				networkBufMem <= max);
+
+			if (networkBufMem < min) {
+				fail("Lower bound not met with configuration: " + config.toString());
+			}
+
+			if (networkBufMem > max) {
+				fail("Upper bound not met with configuration: " + config.toString());
+			}
+
 			if (networkBufMem > min && networkBufMem < max) {
-				assertEquals(
-					"Wrong network buffer memory size with configuration: " + config.toString(),
-					(long) (javaMem * frac), networkBufMem);
+				if ((javaMem  * frac) != networkBufMem) {
+					fail("Wrong network buffer memory size with configuration: " + config.toString() +
+					". Expected value: " + (javaMem * frac) + " actual value: " + networkBufMem + '.');
+				}
 			}
 		}
 	}
@@ -189,88 +185,6 @@ public class TaskManagerServicesTest {
 	}
 
 	/**
-	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(TaskManagerServicesConfiguration)}
-	 * using the same (manual) test cases as in {@link #calculateHeapSizeMB()}.
-	 */
-	@Test
-	public void calculateNetworkBufFromHeapSize() throws Exception {
-		PowerMockito.mockStatic(EnvironmentInformation.class);
-		// some defaults:
-		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
-		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB
-
-		TaskManagerServicesConfiguration tmConfig;
-
-		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
-			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
-			0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
-		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(900L << 20); // 900MB
-		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
-			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
-
-		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
-			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
-			0.2f, 60L << 20, 1L << 30, MemoryType.HEAP);
-		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(800L << 20); // 800MB
-		assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
-			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
-
-		tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
-			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
-		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB
-		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
-			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
-
-		tmConfig = getTmConfig(-1, 0.1f,
-			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
-		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB
-		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
-			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
-	}
-
-	/**
-	 * Returns a task manager services configuration for the tests
-	 *
-	 * @param managedMemory         see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
-	 * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
-	 * @param networkBufFraction	see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
-	 * @param networkBufMin			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}
-	 * @param networkBufMax			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
-	 * @param memType				on-heap or off-heap
-	 *
-	 * @return configuration object
-	 */
-	private static TaskManagerServicesConfiguration getTmConfig(
-		final long managedMemory, final float managedMemoryFraction, float networkBufFraction,
-		long networkBufMin, long networkBufMax,
-		final MemoryType memType) {
-
-		final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
-			networkBufFraction,
-			networkBufMin,
-			networkBufMax,
-			TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
-			null,
-			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
-			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
-			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
-			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(),
-			null);
-
-		return new TaskManagerServicesConfiguration(
-			mock(InetAddress.class),
-			new String[] {},
-			networkConfig,
-			QueryableStateConfiguration.disabled(),
-			1,
-			managedMemory,
-			memType,
-			false,
-			managedMemoryFraction,
-			0);
-	}
-
-	/**
 	 * Test for {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} with some
 	 * manually calculated scenarios.
 	 */