You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 13:50:42 UTC

[4/4] flink git commit: [FLINK-2062] [core] Fix names of memory size config parameter

[FLINK-2062] [core] Fix names of memory size config parameter

This still evaluates the old parameter, if the old one is set and the new one is not set.

This closes #703


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/939e3fc4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/939e3fc4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/939e3fc4

Branch: refs/heads/master
Commit: 939e3fc40fbdb05f2a564c241eb2c791e869f2d3
Parents: 7bd7b05
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 20 16:19:22 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 12:57:37 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 16 ++++++-
 .../minicluster/LocalFlinkMiniCluster.scala     | 24 ++++++++--
 .../flink/runtime/taskmanager/TaskManager.scala | 50 +++++++++++++++-----
 3 files changed, 71 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/939e3fc4/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 6a94c3d..956943a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -137,11 +137,17 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
 
 	/**
-	 * The config parameter defining the size of the buffers used in the network stack.
+	 * Deprecated config parameter defining the size of the buffers used in the network stack.
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes";
 
 	/**
+	 * Config parameter defining the size of memory buffers used by the network stack and the memory manager.
+	 */
+	public static final String TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY = "taskmanager.memory.segment-size";
+	
+	/**
 	 * The implementation to use for spillable/spilled intermediate results, which have both
 	 * synchronous and asynchronous implementations: "sync" or "async".
 	 */
@@ -255,7 +261,7 @@ public final class ConfigConstants {
 	 */
 	public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";
 	
-	// ------------------------ File System Bahavior ------------------------
+	// ------------------------ File System Behavior ------------------------
 
 	/**
 	 * Key to specify whether the file systems should simply overwrite existing files.
@@ -496,9 +502,15 @@ public final class ConfigConstants {
 	/**
 	 * Default size of network stack buffers.
 	 */
+	@Deprecated
 	public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768;
 
 	/**
+	 * Default size of memory segments in the network stack and the memory manager.
+	 */
+	public static final int DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE = 32768;
+
+	/**
 	 * The implementation to use for spillable/spilled intermediate results, which have both
 	 * synchronous and asynchronous implementations: "sync" or "async".
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/939e3fc4/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index d7cd6e9..e2d7cc1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -158,11 +158,25 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
     // set this only if no memory was preconfigured
     if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
 
-      val bufferMem: Long =
-        config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-          ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) *
-          config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
-            ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE)
+      val bufferSizeNew: Int = config.getInteger(
+                                      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
+
+      val bufferSizeOld: Int = config.getInteger(
+                                      ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+      val bufferSize: Int =
+        if (bufferSizeNew != -1) {
+          bufferSizeNew
+        }
+        else if (bufferSizeOld == -1) {
+          // nothing has been configured, take the default
+          ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
+        }
+        else {
+          bufferSizeOld
+        }
+      
+      val bufferMem: Long = config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+          ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
 
       val numTaskManager = config.getInteger(
         ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)

http://git-wip-us.apache.org/repos/asf/flink/blob/939e3fc4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index ed63db0..e6fcca2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1514,26 +1514,52 @@ object TaskManager {
       case x => x
     }
 
-    val pageSize = configuration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE)
+    checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+      "Number of task slots must be at least one.")
+
     val numNetworkBuffers = configuration.getInteger(
       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
 
-    checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-      "Number of task slots must be at least one.")
-
     checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
+    
+    val pageSizeNew: Int = configuration.getInteger(
+                                        ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
+    
+    val pageSizeOld: Int = configuration.getInteger(
+                                        ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+
+    val pageSize: Int =
+      if (pageSizeNew != -1) {
+        // new page size has been configured
+        checkConfigParameter(pageSizeNew >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSizeNew,
+          ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+          "Minimum memory segment size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
+
+        checkConfigParameter(MathUtils.isPowerOf2(pageSizeNew), pageSizeNew,
+          ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+          "Memory segment size must be a power of 2.")
+
+        pageSizeNew
+      }
+      else if (pageSizeOld == -1) {
+        // nothing has been configured, take the default
+        ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
+      }
+      else {
+        // old page size has been configured
+        checkConfigParameter(pageSizeOld >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSizeOld,
+          ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+          "Minimum buffer size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
 
-    checkConfigParameter(pageSize >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSize,
-      ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
-      "Minimum buffer size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
-
-    checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
-      ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
-      "Buffer size must be a power of 2.")
+        checkConfigParameter(MathUtils.isPowerOf2(pageSizeOld), pageSizeOld,
+          ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+          "Buffer size must be a power of 2.")
 
+        pageSizeOld
+      }
+    
     val tmpDirs = configuration.getString(
       ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)