You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:19 UTC

[14/16] flink git commit: [FLINK-6217] ContaineredTaskManagerParameters sets off-heap memory size incorrectly.

[FLINK-6217] ContaineredTaskManagerParameters sets off-heap memory size incorrectly.

This closes #3648.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cf22f41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cf22f41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cf22f41

Branch: refs/heads/master
Commit: 5cf22f411962a1199da7843252386b77e2461851
Parents: 9375808
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Mar 29 16:57:52 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 11:08:10 2017 +0200

----------------------------------------------------------------------
 .../clusterframework/LaunchableMesosWorker.java |  4 +-
 .../clusterframework/BootstrapTools.java        | 18 +++++--
 .../ContaineredTaskManagerParameters.java       |  5 +-
 .../ContaineredTaskManagerParametersTest.java   | 49 ++++++++++++++++++++
 4 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5cf22f41/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 3d5350a..2408ac6 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -223,7 +223,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		// finalize the memory parameters
 		jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
 		jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
-		jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+		if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) {
+			jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+		}
 
 		// pass dynamic system properties
 		jvmArgs.append(' ').append(

http://git-wip-us.apache.org/repos/asf/flink/blob/5cf22f41/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 9bcaa18..e9d3cbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -25,6 +25,7 @@ import com.typesafe.config.Config;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -46,6 +47,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.BindException;
 import java.net.ServerSocket;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -358,10 +360,18 @@ public class BootstrapTools {
 
 		final Map<String, String> startCommandValues = new HashMap<>();
 		startCommandValues.put("java", "$JAVA_HOME/bin/java");
-		startCommandValues
-			.put("jvmmem", 	"-Xms" + tmParams.taskManagerHeapSizeMB() + "m " +
-							"-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " +
-							"-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m");
+
+		ArrayList<String> params = new ArrayList<>();
+		params.add(String.format("-Xms%dm", tmParams.taskManagerHeapSizeMB()));
+		params.add(String.format("-Xmx%dm", tmParams.taskManagerHeapSizeMB()));
+
+		if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) {
+			params.add(String.format("-XX:MaxDirectMemorySize=%dm",
+				tmParams.taskManagerDirectMemoryLimitMB()));
+		}
+
+		startCommandValues.put("jvmmem", StringUtils.join(params, ' '));
+
 		String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
 		if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) {
 			javaOpts += " " + flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS);

http://git-wip-us.apache.org/repos/asf/flink/blob/5cf22f41/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 0fc0870..8ff3c25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -146,8 +146,9 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 		final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
 
 		final long heapSizeMB;
+		long offHeapSize = -1;
 		if (useOffHeap) {
-			long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+			offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
 
 			if (offHeapSize <= 0) {
 				double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
@@ -174,6 +175,6 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 		
 		// done
 		return new ContaineredTaskManagerParameters(
-			containerMemoryMB, heapSizeMB, javaMemorySizeMB, numSlots, envVars);
+			containerMemoryMB, heapSizeMB, offHeapSize, numSlots, envVars);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5cf22f41/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
new file mode 100644
index 0000000..c0c48f9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.apache.flink.configuration.ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY;
+import static org.apache.flink.configuration.TaskManagerOptions.MANAGED_MEMORY_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ContaineredTaskManagerParametersTest {
+	private static final long CONTAINER_MEMORY = 8192;
+
+	@Test
+	public void testDefaultOffHeapMemory() {
+		Configuration conf = new Configuration();
+		ContaineredTaskManagerParameters params =
+			ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+		assertEquals(-1, params.taskManagerDirectMemoryLimitMB());
+	}
+
+	@Test
+	public void testTotalMemoryDoesNotExceedContainerMemory() {
+		Configuration conf = new Configuration();
+		conf.setBoolean(MANAGED_MEMORY_SIZE.key(), true);
+		ContaineredTaskManagerParameters params =
+			ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+		assertTrue(params.taskManagerHeapSizeMB() +
+			params.taskManagerDirectMemoryLimitMB() <= CONTAINER_MEMORY);
+	}
+}