You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 13:50:42 UTC
[4/4] flink git commit: [FLINK-2062] [core] Fix names of memory size
config parameter
[FLINK-2062] [core] Fix names of memory size config parameter
This still evaluates the old parameter, if the old one is set and the new one is not set.
This closes #703
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/939e3fc4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/939e3fc4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/939e3fc4
Branch: refs/heads/master
Commit: 939e3fc40fbdb05f2a564c241eb2c791e869f2d3
Parents: 7bd7b05
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 20 16:19:22 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 12:57:37 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 16 ++++++-
.../minicluster/LocalFlinkMiniCluster.scala | 24 ++++++++--
.../flink/runtime/taskmanager/TaskManager.scala | 50 +++++++++++++++-----
3 files changed, 71 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/939e3fc4/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 6a94c3d..956943a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -137,11 +137,17 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
/**
- * The config parameter defining the size of the buffers used in the network stack.
+ * Deprecated config parameter defining the size of the buffers used in the network stack.
*/
+ @Deprecated
public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes";
/**
+ * Config parameter defining the size of memory buffers used by the network stack and the memory manager.
+ */
+ public static final String TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY = "taskmanager.memory.segment-size";
+
+ /**
* The implementation to use for spillable/spilled intermediate results, which have both
* synchronous and asynchronous implementations: "sync" or "async".
*/
@@ -255,7 +261,7 @@ public final class ConfigConstants {
*/
public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";
- // ------------------------ File System Bahavior ------------------------
+ // ------------------------ File System Behavior ------------------------
/**
* Key to specify whether the file systems should simply overwrite existing files.
@@ -496,9 +502,15 @@ public final class ConfigConstants {
/**
* Default size of network stack buffers.
*/
+ @Deprecated
public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768;
/**
+ * Default size of memory segments in the network stack and the memory manager.
+ */
+ public static final int DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE = 32768;
+
+ /**
* The implementation to use for spillable/spilled intermediate results, which have both
* synchronous and asynchronous implementations: "sync" or "async".
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/939e3fc4/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index d7cd6e9..e2d7cc1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -158,11 +158,25 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
// set this only if no memory was preconfigured
if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
- val bufferMem: Long =
- config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) *
- config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE)
+ val bufferSizeNew: Int = config.getInteger(
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
+
+ val bufferSizeOld: Int = config.getInteger(
+ ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+ val bufferSize: Int =
+ if (bufferSizeNew != -1) {
+ bufferSizeNew
+ }
+ else if (bufferSizeOld == -1) {
+ // nothing has been configured, take the default
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
+ }
+ else {
+ bufferSizeOld
+ }
+
+ val bufferMem: Long = config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
val numTaskManager = config.getInteger(
ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
http://git-wip-us.apache.org/repos/asf/flink/blob/939e3fc4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index ed63db0..e6fcca2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1514,26 +1514,52 @@ object TaskManager {
case x => x
}
- val pageSize = configuration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE)
+ checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ "Number of task slots must be at least one.")
+
val numNetworkBuffers = configuration.getInteger(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
- checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
- "Number of task slots must be at least one.")
-
checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
+
+ val pageSizeNew: Int = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
+
+ val pageSizeOld: Int = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+
+ val pageSize: Int =
+ if (pageSizeNew != -1) {
+ // new page size has been configured
+ checkConfigParameter(pageSizeNew >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSizeNew,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Minimum memory segment size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
+
+ checkConfigParameter(MathUtils.isPowerOf2(pageSizeNew), pageSizeNew,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Memory segment size must be a power of 2.")
+
+ pageSizeNew
+ }
+ else if (pageSizeOld == -1) {
+ // nothing has been configured, take the default
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
+ }
+ else {
+ // old page size has been configured
+ checkConfigParameter(pageSizeOld >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSizeOld,
+ ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+ "Minimum buffer size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
- checkConfigParameter(pageSize >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSize,
- ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
- "Minimum buffer size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
-
- checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
- ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
- "Buffer size must be a power of 2.")
+ checkConfigParameter(MathUtils.isPowerOf2(pageSizeOld), pageSizeOld,
+ ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+ "Buffer size must be a power of 2.")
+ pageSizeOld
+ }
+
val tmpDirs = configuration.getString(
ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)