You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:19 UTC
[14/16] flink git commit: [FLINK-6217]
ContaineredTaskManagerParameters sets off-heap memory size incorrectly.
[FLINK-6217] ContaineredTaskManagerParameters sets off-heap memory size incorrectly.
This closes #3648.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cf22f41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cf22f41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cf22f41
Branch: refs/heads/master
Commit: 5cf22f411962a1199da7843252386b77e2461851
Parents: 9375808
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Mar 29 16:57:52 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 11:08:10 2017 +0200
----------------------------------------------------------------------
.../clusterframework/LaunchableMesosWorker.java | 4 +-
.../clusterframework/BootstrapTools.java | 18 +++++--
.../ContaineredTaskManagerParameters.java | 5 +-
.../ContaineredTaskManagerParametersTest.java | 49 ++++++++++++++++++++
4 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5cf22f41/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 3d5350a..2408ac6 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -223,7 +223,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
// finalize the memory parameters
jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
- jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+ if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) {
+ jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+ }
// pass dynamic system properties
jvmArgs.append(' ').append(
http://git-wip-us.apache.org/repos/asf/flink/blob/5cf22f41/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 9bcaa18..e9d3cbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -25,6 +25,7 @@ import com.typesafe.config.Config;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -46,6 +47,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.net.BindException;
import java.net.ServerSocket;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -358,10 +360,18 @@ public class BootstrapTools {
final Map<String, String> startCommandValues = new HashMap<>();
startCommandValues.put("java", "$JAVA_HOME/bin/java");
- startCommandValues
- .put("jvmmem", "-Xms" + tmParams.taskManagerHeapSizeMB() + "m " +
- "-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " +
- "-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m");
+
+ ArrayList<String> params = new ArrayList<>();
+ params.add(String.format("-Xms%dm", tmParams.taskManagerHeapSizeMB()));
+ params.add(String.format("-Xmx%dm", tmParams.taskManagerHeapSizeMB()));
+
+ if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) {
+ params.add(String.format("-XX:MaxDirectMemorySize=%dm",
+ tmParams.taskManagerDirectMemoryLimitMB()));
+ }
+
+ startCommandValues.put("jvmmem", StringUtils.join(params, ' '));
+
String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) {
javaOpts += " " + flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS);
http://git-wip-us.apache.org/repos/asf/flink/blob/5cf22f41/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 0fc0870..8ff3c25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -146,8 +146,9 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
final long heapSizeMB;
+ long offHeapSize = -1;
if (useOffHeap) {
- long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+ offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
if (offHeapSize <= 0) {
double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
@@ -174,6 +175,6 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
// done
return new ContaineredTaskManagerParameters(
- containerMemoryMB, heapSizeMB, javaMemorySizeMB, numSlots, envVars);
+ containerMemoryMB, heapSizeMB, offHeapSize, numSlots, envVars);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cf22f41/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
new file mode 100644
index 0000000..c0c48f9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.apache.flink.configuration.ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY;
+import static org.apache.flink.configuration.TaskManagerOptions.MANAGED_MEMORY_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ContaineredTaskManagerParametersTest {
+ private static final long CONTAINER_MEMORY = 8192;
+
+ @Test
+ public void testDefaultOffHeapMemory() {
+ Configuration conf = new Configuration();
+ ContaineredTaskManagerParameters params =
+ ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+ assertEquals(-1, params.taskManagerDirectMemoryLimitMB());
+ }
+
+ @Test
+ public void testTotalMemoryDoesNotExceedContainerMemory() {
+ Configuration conf = new Configuration();
+ conf.setBoolean(MANAGED_MEMORY_SIZE.key(), true);
+ ContaineredTaskManagerParameters params =
+ ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+ assertTrue(params.taskManagerHeapSizeMB() +
+ params.taskManagerDirectMemoryLimitMB() <= CONTAINER_MEMORY);
+ }
+}