You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/06 17:47:45 UTC

[01/12] flink git commit: [hotfix] fix typo in error message

Repository: flink
Updated Branches:
  refs/heads/master f37988c19 -> b01d737ae


[hotfix] fix typo in error message


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/546d0aad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/546d0aad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/546d0aad

Branch: refs/heads/master
Commit: 546d0aadd11169294be37a3f4591369a353e3123
Parents: 4e1b48e
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Apr 11 14:20:40 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 14:31:43 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/common/io/DelimitedInputFormat.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/546d0aad/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 3dc567e..e20f646 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -450,7 +450,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 		}
 		catch (Throwable t) {
 			if (LOG.isErrorEnabled()) {
-				LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': "
+				LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': "
 						+ t.getMessage(), t);
 			}
 		} finally {


[03/12] flink git commit: [hotfix] [build] Updated outdated config keys in default flink-conf.yaml

Posted by se...@apache.org.
[hotfix] [build] Updated outdated config keys in default flink-conf.yaml


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/179c64e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/179c64e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/179c64e2

Branch: refs/heads/master
Commit: 179c64e2bdc13905f504c209413a464e721a1f54
Parents: f37988c
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 4 11:25:24 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 14:31:43 2017 +0200

----------------------------------------------------------------------
 flink-dist/src/main/resources/flink-conf.yaml | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/179c64e2/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 446d992..12c90e1 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -165,15 +165,25 @@ jobmanager.web.port: 8081
 # High Availability
 #==============================================================================
 
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper stored
+# the small ground truth for checkpoint and leader election, this location stores
+# the larger objects, like persisted dataflow graphs.
+# 
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...) 
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
 # The list of ZooKeeper quorum peers that coordinate the high-availability
 # setup. This must be a list of the form:
-#
 # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
 #
-
-# high-availability: zookeeper
 # high-availability.zookeeper.quorum: localhost:2181
-# high-availability.zookeeper.storageDir: hdfs:///flink/ha/
+
 
 # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
 # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)


[09/12] flink git commit: [FLINK-6443] [docs] Add more links to concepts docs

Posted by se...@apache.org.
[FLINK-6443] [docs] Add more links to concepts docs

This closes #3822


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b01d737a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b01d737a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b01d737a

Branch: refs/heads/master
Commit: b01d737ae1452dbfafd4696ff14d52dce5b60efd
Parents: 6c48f9b
Author: David Anderson <da...@alpinegizmo.com>
Authored: Thu May 4 11:27:33 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:41:53 2017 +0200

----------------------------------------------------------------------
 docs/concepts/programming-model.md | 22 +++++++++++++++++-----
 docs/concepts/runtime.md           | 16 ++++++++--------
 2 files changed, 25 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b01d737a/docs/concepts/programming-model.md
----------------------------------------------------------------------
diff --git a/docs/concepts/programming-model.md b/docs/concepts/programming-model.md
index 3d2aebb..d83cf00 100644
--- a/docs/concepts/programming-model.md
+++ b/docs/concepts/programming-model.md
@@ -48,7 +48,7 @@ Flink offers different levels of abstraction to develop streaming/batch applicat
     for certain operations only. The *DataSet API* offers additional primitives on bounded data sets, like loops/iterations.
 
   - The **Table API** is a declarative DSL centered around *tables*, which may be dynamically changing tables (when representing streams).
-    The Table API follows the (extended) relational model: Tables have a schema attached (similar to tables in relational databases)
+    The [Table API](../dev/table_api.html) follows the (extended) relational model: Tables have a schema attached (similar to tables in relational databases)
     and the API offers comparable operations, such as select, project, join, group-by, aggregate, etc.
     Table API programs declaratively define *what logical operation should be done* rather than specifying exactly
    *how the code for the operation looks*. Though the Table API is extensible by various types of user-defined
@@ -60,7 +60,7 @@ Flink offers different levels of abstraction to develop streaming/batch applicat
 
   - The highest level abstraction offered by Flink is **SQL**. This abstraction is similar to the *Table API* both in semantics and
     expressiveness, but represents programs as SQL query expressions.
-    The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the *Table API*.
+    The [SQL](../dev/table_api.html#sql) abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the *Table API*.
 
 
 ## Programs and Dataflows
@@ -81,6 +81,9 @@ arbitrary **directed acyclic graphs** *(DAGs)*. Although special forms of cycles
 Often there is a one-to-one correspondence between the transformations in the programs and the operators
 in the dataflow. Sometimes, however, one transformation may consist of multiple transformation operators.
 
+Sources and sinks are documented in the [streaming connectors](../dev/connectors/index.html) and [batch connectors](../dev/batch/connectors.html) docs.
+Transformations are documented in [DataStream transformations](../dev/datastream_api.html#datastream-transformations) and [DataSet transformations](../dev/batch/dataset_transformations.html).
+
 {% top %}
 
 ## Parallel Dataflows
@@ -112,6 +115,8 @@ Streams can transport data between two operators in a *one-to-one* (or *forwardi
     is preserved, but the parallelism does introduce non-determinism regarding the order in
     which the aggregated results for different keys arrive at the sink.
 
+Details about configuring and controlling parallelism can be found in the docs on [parallel execution](../dev/parallel.html).
+
 {% top %}
 
 ## Windows
@@ -128,6 +133,7 @@ One typically distinguishes different types of windows, such as *tumbling window
 <img src="../fig/windows.svg" alt="Time- and Count Windows" class="offset" width="80%" />
 
 More window examples can be found in this [blog post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html).
+More details are in the [window docs](../dev/windows.html).
 
 {% top %}
 
@@ -165,6 +171,8 @@ This alignment also allows Flink to redistribute the state and adjust the stream
 
 <img src="../fig/state_partitioning.svg" alt="State and Partitioning" class="offset" width="50%" />
 
+For more information, see the documentation on [working with state](../dev/stream/state.html).
+
 {% top %}
 
 ## Checkpoints for Fault Tolerance
@@ -178,17 +186,21 @@ point of the checkpoint.
 The checkpoint interval is a means of trading off the overhead of fault tolerance during execution with the recovery time (the number
 of events that need to be replayed).
 
-More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{ site.baseurl }}/internals/stream_checkpointing.html).
+The description of the [fault tolerance internals]({{ site.baseurl }}/internals/stream_checkpointing.html) provides
+more information about how Flink manages checkpoints and related topics.
+Details about enabling and configuring checkpointing are in the [checkpointing API docs](../dev/stream/checkpointing.html).
+
 
 {% top %}
 
 ## Batch on Streaming
 
-Flink executes batch programs as a special case of streaming programs, where the streams are bounded (finite number of elements).
+Flink executes [batch programs](../dev/batch/index.html) as a special case of streaming programs, where the streams are bounded (finite number of elements).
 A *DataSet* is treated internally as a stream of data. The concepts above thus apply to batch programs in the
 same way as well as they apply to streaming programs, with minor exceptions:
 
-  - Programs in the DataSet API do not use checkpoints. Recovery happens by fully replaying the streams.
+  - [Fault tolerance for batch programs](../dev/batch/fault_tolerance.html) does not use checkpointing.
+    Recovery happens by fully replaying the streams.
     That is possible, because inputs are bounded. This pushes the cost more towards the recovery,
     but makes the regular processing cheaper, because it avoids checkpoints.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b01d737a/docs/concepts/runtime.md
----------------------------------------------------------------------
diff --git a/docs/concepts/runtime.md b/docs/concepts/runtime.md
index 0d4e017..c598b12 100644
--- a/docs/concepts/runtime.md
+++ b/docs/concepts/runtime.md
@@ -31,7 +31,7 @@ under the License.
 For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread.
 Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread
 handover and buffering, and increases overall throughput while decreasing latency.
-The chaining behavior can be configured in the APIs.
+The chaining behavior can be configured; see the [chaining docs](../dev/datastream_api.html#task-chaining-and-resource-groups) for details.
 
 The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads.
 
@@ -54,9 +54,9 @@ The Flink runtime consists of two types of processes:
 
     There must always be at least one TaskManager.
 
-The JobManagers and TaskManagers can be started in various ways: directly on the machines, in
-containers, or managed by resource frameworks like YARN. TaskManagers connect to JobManagers, announcing
-themselves as available, and are assigned work.
+The JobManagers and TaskManagers can be started in various ways: directly on the machines as a [standalone cluster](../setup/cluster_setup.html), in
+containers, or managed by resource frameworks like [YARN](../setup/yarn_setup.html) or [Mesos](../setup/mesos.html).
+TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work.
 
 The **client** is not part of the runtime and program execution, but is used to prepare and send a dataflow to the JobManager.
 After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the
@@ -98,7 +98,7 @@ job. Allowing this *slot sharing* has two main benefits:
 
 <img src="../fig/slot_sharing.svg" alt="TaskManagers with shared Task Slots" class="offset" width="80%" />
 
-The APIs also include a *resource group* mechanism which can be used to prevent undesirable slot sharing. 
+The APIs also include a *[resource group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism which can be used to prevent undesirable slot sharing. 
 
 As a rule-of-thumb, a good default number of task slots would be the number of CPU cores.
 With hyper-threading, each slot then takes 2 or more hardware thread contexts.
@@ -107,7 +107,7 @@ With hyper-threading, each slot then takes 2 or more hardware thread contexts.
 
 ## State Backends
 
-The exact data structures in which the key/values indexes are stored depends on the chosen **state backend**. One state backend
+The exact data structures in which the key/values indexes are stored depends on the chosen [state backend](../ops/state_backends.html). One state backend
 stores data in an in-memory hash map, another state backend uses [RocksDB](http://rocksdb.org) as the key/value store.
 In addition to defining the data structure that holds the state, the state backends also implement the logic to
 take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint.
@@ -120,8 +120,8 @@ take a point-in-time snapshot of the key/value state and store that snapshot as
 
 Programs written in the Data Stream API can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without losing any state. 
 
-Savepoints are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed.
+[Savepoints](..//setup/savepoints.html) are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed.
 
-Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed.
+Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed. Savepoints can be created from the [command line](../setup/cli.html#savepoints) or when cancelling a job via the [REST API](../monitoring/rest_api.html#cancel-job-with-savepoint).
 
 {% top %}


[06/12] flink git commit: [FLINK-4545] [network] replace the network buffers parameter

Posted by se...@apache.org.
[FLINK-4545] [network] replace the network buffers parameter

Instead, allow the configuration with the following three new (more flexible)
parameters:
 * "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1)
 * "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB)
 * "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB)

This closes #3721


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bb49e53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bb49e53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bb49e53

Branch: refs/heads/master
Commit: 0bb49e538c118b8265377355a9667789a3971966
Parents: ac72450
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Apr 6 14:41:52 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:40:29 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  80 ++++-
 docs/setup/yarn_setup.md                        |   2 +-
 .../addons/hbase/HBaseConnectorITCase.java      |  37 +++
 .../flink/configuration/TaskManagerOptions.java |  20 ++
 flink-dist/pom.xml                              |  24 ++
 flink-dist/src/main/flink-bin/bin/config.sh     | 133 ++++++++
 .../src/main/flink-bin/bin/taskmanager.sh       |  31 +-
 flink-dist/src/test/bin/calcTMHeapSizeMB.sh     |  42 +++
 flink-dist/src/test/bin/calcTMNetBufMem.sh      |  39 +++
 ...kManagerHeapSizeCalculationJavaBashTest.java | 306 +++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |  35 +++
 .../clusterframework/BootstrapTools.java        |   2 +-
 .../ContaineredTaskManagerParameters.java       |  26 +-
 .../io/network/buffer/NetworkBufferPool.java    |   9 +-
 .../partition/SpillableSubpartition.java        |   6 +-
 .../minicluster/MiniClusterConfiguration.java   |   9 +-
 .../taskexecutor/TaskManagerServices.java       | 235 +++++++++++++-
 .../TaskManagerServicesConfiguration.java       | 114 ++++++-
 .../minicluster/LocalFlinkMiniCluster.scala     |  11 +-
 .../NetworkEnvironmentConfiguration.scala       |   4 +-
 .../TaskManagerServicesConfigurationTest.java   | 111 +++++++
 .../taskexecutor/TaskManagerServicesTest.java   | 289 ++++++++++++++++++
 ...askManagerComponentsStartupShutdownTest.java |   9 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   2 +
 .../org/apache/flink/yarn/cli/FlinkYarnCLI.java |   2 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   2 +-
 26 files changed, 1477 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index a3daac0..c4a7354 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -82,13 +82,13 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to
 
 ### Managed Memory
 
-By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.
+By default, Flink allocates a fraction of `0.7` of the free memory (total memory configured via `taskmanager.heap.mb` minus memory used for network buffers) for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.
 
 The default fraction for managed memory can be adjusted using the `taskmanager.memory.fraction` parameter. An absolute value may be set using `taskmanager.memory.size` (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes.
 
 - `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on-heap or off-heap (depending on `taskmanager.memory.off-heap`) for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio with respect to the size of the task manager JVM as specified by `taskmanager.memory.fraction`. (DEFAULT: -1)
 
-- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
+- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
 
 - `taskmanager.memory.off-heap`: If set to `true`, the task manager allocates memory which is used for sorting, hash tables, and caching of intermediate results outside of the JVM heap. For setups with larger quantities of memory, this can improve the efficiency of the operations performed on the memory (DEFAULT: false).
 
@@ -174,7 +174,11 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `fs.output.always-create-directory`: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to *true*, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to *false*, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false)
 
-- `taskmanager.network.numberOfBuffers`: The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: 2048).
+- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1)
+
+- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB)
+
+- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB)
 
 - `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends:
    -  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
@@ -257,11 +261,17 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 - `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the system's directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: **The system's tmp dir**).
 
-- `taskmanager.network.numberOfBuffers`: The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**).
+- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that `taskmanager.network.memory.min` and `taskmanager.network.memory.max` may override this fraction. (DEFAULT: **0.1**)
+
+- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: **64 MB**). Previously, this was determined from `taskmanager.network.numberOfBuffers` and `taskmanager.memory.segment-size`.
+
+- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: **1 GB**). Previously, this was determined from `taskmanager.network.numberOfBuffers` and `taskmanager.memory.segment-size`.
+
+- `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`.
 
 - `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on the JVM's heap space for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio of the heap memory available to the JVM, as specified by `taskmanager.memory.fraction`. (DEFAULT: **-1**)
 
-- `taskmanager.memory.fraction`: The relative amount of memory that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of 0.8 means that TaskManagers reserve 80% of the JVM's heap space for internal data buffers, leaving 20% of the JVM's heap space free for objects created by user-defined functions. (DEFAULT: **0.7**) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
+- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
 
 - `taskmanager.debug.memory.startLogThread`: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool.
 
@@ -614,9 +624,54 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated
 
 ## Background
 
+
 ### Configuring the Network Buffers
 
-If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers:
+If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you
+need to adapt the amount of memory used for network buffers in order for your program to run on your
+task managers.
+
+Network buffers are a critical resource for the communication layers. They are used to buffer
+records before transmission over a network, and to buffer incoming data before dissecting it into
+records and handing them to the application. A sufficient number of network buffers is critical to
+achieve a good throughput.
+
+<div class="alert alert-info">
+Since Flink 1.3, you may follow the idiom "more is better" without any penalty on the latency (we
+prevent excessive buffering in each outgoing and incoming channel, i.e. *buffer bloat*, by limiting
+the actual number of buffers used by each channel).
+</div>
+
+In general, configure the task manager to have enough buffers that each logical network connection
+you expect to be open at the same time has a dedicated buffer. A logical network connection exists
+for each point-to-point exchange of data over the network, which typically happens at
+repartitioning or broadcasting steps (shuffle phase). In those, each parallel task inside the
+TaskManager has to be able to talk to all other parallel tasks.
+
+#### Setting Memory Fractions
+
+Previously, the number of network buffers was set manually which became a quite error-prone task
+(see below). Since Flink 1.3, it is possible to define a fraction of memory that is being used for
+network buffers with the following configuration parameters:
+
+- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers (DEFAULT: 0.1),
+- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB),
+- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB), and
+- `taskmanager.memory.segment-size`: Size of memory buffers used by the memory manager and the
+network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
+
+#### Setting the Number of Network Buffers directly
+
+<div class="alert alert-warning">
+  <strong>Note:</strong> This way of configuring the amount of memory used for network buffers is deprecated. Please consider using the method above by defining a fraction of memory to use.
+</div>
+
+The required number of buffers on a task manager is
+*total-degree-of-parallelism* (number of targets) \* *intra-node-parallelism* (number of sources in one task manager) \* *n*
+with *n* being a constant that defines how many repartitioning-/broadcasting steps you expect to be
+active at the same time. Since the *intra-node-parallelism* is typically the number of cores, and
+more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently
+boils down to
 
 ```
 #slots-per-TM^2 * #TMs * 4
@@ -624,16 +679,11 @@ If you ever see the Exception `java.io.IOException: Insufficient number of netwo
 
 Where `#slots per TM` are the [number of slots per TaskManager](#configuring-taskmanager-processing-slots) and `#TMs` are the total number of task managers.
 
-Network buffers are a critical resource for the communication layers. They are used to buffer records before transmission over a network, and to buffer incoming data before dissecting it into records and handing them to the
-application. A sufficient number of network buffers is critical to achieve a good throughput.
-
-In general, configure the task manager to have enough buffers that each logical network connection you expect to be open at the same time has a dedicated buffer. A logical network connection exists for each point-to-point exchange of data over the network, which typically happens at repartitioning- or broadcasting steps (shuffle phase). In those, each parallel task inside the TaskManager has to be able to talk to all other parallel tasks. Hence, the required number of buffers on a task manager is *total-degree-of-parallelism* (number of targets) \* *intra-node-parallelism* (number of sources in one task manager) \* *n*. Here, *n* is a constant that defines how many repartitioning-/broadcasting steps you expect to be active at the same time.
-
-Since the *intra-node-parallelism* is typically the number of cores, and more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently boils down to `#slots-per-TM^2 * #TMs * 4`.
-
-To support for example a cluster of 20 8-slot machines, you should use roughly 5000 network buffers for optimal throughput.
+To support, for example, a cluster of 20 8-slot machines, you should use roughly 5000 network
+buffers for optimal throughput.
 
-Each network buffer has by default a size of 32 KiBytes. In the above example, the system would allocate roughly 300 MiBytes for network buffers.
+Each network buffer has by default a size of 32 KiBytes. In the example above, the system would thus
+allocate roughly 300 MiBytes for network buffers.
 
 The number and size of network buffers can be configured with the following parameters:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index 3149ec2..1ce45ad 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -122,7 +122,7 @@ The system will use the configuration in `conf/flink-conf.yaml`. Please follow o
 
 Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelism.default` if the number of slots has been specified.
 
-If you don't want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368`.
+If you don't want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624`.
 
 The example invocation starts 11 containers (even though only 10 containers were requested), since there is one additional container for the ApplicationMaster and Job Manager.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index c1aa9a0..33bbe12 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -22,7 +22,11 @@ package org.apache.flink.addons.hbase;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
@@ -36,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -74,6 +79,12 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 	public static void activateHBaseCluster() throws IOException {
 		registerHBaseMiniClusterInClasspath();
 		prepareTable();
+		LimitNetworkBuffersTestEnvironment.setAsContext();
+	}
+
+	@AfterClass
+	public static void resetExecutionEnvironmentFactory() {
+		LimitNetworkBuffersTestEnvironment.unsetAsContext();
 	}
 
 	private static void prepareTable() throws IOException {
@@ -335,4 +346,30 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 	}
 
 
+	/**
+	 * Allows the tests to use {@link ExecutionEnvironment#getExecutionEnvironment()} but with a
+	 * configuration that limits the maximum memory used for network buffers since the current
+	 * defaults are too high for Travis-CI.
+	 */
+	private static abstract class LimitNetworkBuffersTestEnvironment extends ExecutionEnvironment {
+
+		public static void setAsContext() {
+			Configuration config = new Configuration();
+			// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
+			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
+			final LocalEnvironment le = new LocalEnvironment(config);
+
+			initializeContextEnvironment(new ExecutionEnvironmentFactory() {
+				@Override
+				public ExecutionEnvironment createExecutionEnvironment() {
+					return le;
+				}
+			});
+		}
+
+		public static void unsetAsContext() {
+			resetContextEnvironment();
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index e915c0b..c5063d1 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -85,11 +85,30 @@ public class TaskManagerOptions {
 	/**
 	 * Number of buffers used in the network stack. This defines the number of possible tasks and
 	 * shuffles.
+	 *
+	 * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
+	 * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
 	 */
+	@Deprecated
 	public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
 			key("taskmanager.network.numberOfBuffers")
 			.defaultValue(2048);
 
+	/** Fraction of JVM memory to use for network buffers. */
+	public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
+			key("taskmanager.network.memory.fraction")
+			.defaultValue(0.1f);
+
+	/** Minimum memory size for network buffers (in bytes) */
+	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN =
+			key("taskmanager.network.memory.min")
+			.defaultValue(64L << 20); // 64 MB
+
+	/** Maximum memory size for network buffers (in bytes) */
+	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX =
+			key("taskmanager.network.memory.max")
+			.defaultValue(1024L << 20); // 1 GB
+
 
 	/** Minimum backoff for partition requests of input channels. */
 	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
@@ -101,6 +120,7 @@ public class TaskManagerOptions {
 			key("taskmanager.net.request-backoff.max")
 			.defaultValue(10000);
 
+
 	/**
 	 * Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel).
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index ff87d53..5f6a39b 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -254,6 +254,16 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 		<!-- end optional Flink libraries -->
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<!-- end test dependencies -->
 	</dependencies>
 
 	<profiles>
@@ -312,6 +322,20 @@ under the License.
 	<build>
 		<plugins>
 
+			<!--unit tests-->
+			<!--plugin must appear BEFORE the shade-plugin to not mess up package order and include the non-uber JAR into the assembly-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<reuseForks>false</reuseForks>
+					<!-- <workingDirectory>${project.build.testOutputDirectory}</workingDirectory> -->
+					<systemPropertyVariables>
+						<log.level>WARN</log.level>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
+
 			<!-- binary compatibility checks -->
 			<plugin>
 				<groupId>com.github.siom79.japicmp</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 0894481..7e3c1d4 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -108,6 +108,11 @@ KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
 KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
 KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate"
 
+KEY_TASKM_NET_BUF_FRACTION="taskmanager.network.memory.fraction"
+KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min"
+KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max"
+KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback
+
 KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
 
 KEY_ENV_PID_DIR="env.pid.dir"
@@ -231,6 +236,31 @@ if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then
     FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}")
 fi
 
+
+# Define FLINK_TM_NET_BUF_FRACTION if it is not already set
+if [ -z "${FLINK_TM_NET_BUF_FRACTION}" ]; then
+    FLINK_TM_NET_BUF_FRACTION=$(readFromConfig ${KEY_TASKM_NET_BUF_FRACTION} 0.1 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback)
+if [ -z "${FLINK_TM_NET_BUF_MIN}" -a -z "${FLINK_TM_NET_BUF_MAX}" ]; then
+    FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 "${YAML_CONF}")
+    FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN}
+fi
+
+# Define FLINK_TM_NET_BUF_MIN if it is not already set
+if [ -z "${FLINK_TM_NET_BUF_MIN}" -o "${FLINK_TM_NET_BUF_MIN}" = "-1" ]; then
+    # default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each)
+    FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_NET_BUF_MAX if it is not already set
+if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
+    # default: 1GB = 1073741824 bytes
+    FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 "${YAML_CONF}")
+fi
+
+
 # Verify that NUMA tooling is available
 command -v numactl >/dev/null 2>&1
 if [[ $? -ne 0 ]]; then
@@ -463,3 +493,106 @@ TMSlaves() {
 useOffHeapMemory() {
     [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
 }
+
+HAVE_AWK=
+# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config)
+calculateNetworkBufferMemory() {
+    local network_buffers_bytes
+    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+        echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+        exit 1
+    fi
+
+    if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
+        # fix memory size for network buffers
+        network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
+    else
+        if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
+            echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
+            echo "Min must be less than or equal to max."
+            echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
+            exit 1
+        fi
+
+        # Bash only performs integer arithmetic so floating point computation is performed using awk
+        if [[ -z "${HAVE_AWK}" ]] ; then
+            command -v awk >/dev/null 2>&1
+            if [[ $? -ne 0 ]]; then
+                echo "[ERROR] Program 'awk' not found."
+                echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+                exit 1
+            fi
+            HAVE_AWK=true
+        fi
+
+        # We calculate the memory using a fraction of the total memory
+        if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
+            echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
+            echo "It must be between 0.0 and 1.0."
+            echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+            exit 1
+        fi
+
+        network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+    fi
+
+    # recalculate the JVM heap memory by taking the network buffers into account
+    local tm_heap_size_bytes=$((${FLINK_TM_HEAP} << 20)) # megabytes to bytes
+    if [[ "${tm_heap_size_bytes}" -le "${network_buffers_bytes}" ]]; then
+        echo "[ERROR] Configured TaskManager memory size (${FLINK_TM_HEAP} MB, from '${KEY_TASKM_MEM_SIZE}') must be larger than the network buffer memory size (${network_buffers_bytes} bytes, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')."
+        exit 1
+    fi
+
+    echo ${network_buffers_bytes}
+}
+
+# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)
+calculateTaskManagerHeapSizeMB() {
+    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+        echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+        exit 1
+    fi
+
+    local tm_heap_size_mb=${FLINK_TM_HEAP}
+
+    if useOffHeapMemory; then
+
+        local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes
+        tm_heap_size_mb=$((tm_heap_size_mb - network_buffers_mb))
+
+        if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+            # We split up the total memory in heap and off-heap memory
+            if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+                echo "[ERROR] Remaining TaskManager memory size (${tm_heap_size_mb} MB, from: '${KEY_TASKM_MEM_SIZE}' (${FLINK_TM_HEAP} MB) minus network buffer memory size (${network_buffers_mb} MB, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')) must be larger than the managed memory size (${FLINK_TM_MEM_MANAGED_SIZE} MB, from: '${KEY_TASKM_MEM_MANAGED_SIZE}')."
+                exit 1
+            fi
+
+            tm_heap_size_mb=$((tm_heap_size_mb - FLINK_TM_MEM_MANAGED_SIZE))
+        else
+            # Bash only performs integer arithmetic so floating point computation is performed using awk
+            if [[ -z "${HAVE_AWK}" ]] ; then
+                command -v awk >/dev/null 2>&1
+                if [[ $? -ne 0 ]]; then
+                    echo "[ERROR] Program 'awk' not found."
+                    echo "Please install 'awk' or define '${KEY_TASKM_MEM_MANAGED_SIZE}' instead of '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
+                    exit 1
+                fi
+                HAVE_AWK=true
+            fi
+
+            # We calculate the memory using a fraction of the total memory
+            if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_MEM_MANAGED_FRACTION}"` != "1" ]]; then
+                echo "[ERROR] Configured TaskManager managed memory fraction '${FLINK_TM_MEM_MANAGED_FRACTION}' is not a valid value."
+                echo "It must be between 0.0 and 1.0."
+                echo "Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
+                exit 1
+            fi
+
+            # recalculate the JVM heap memory by taking the off-heap ratio into account
+            local offheap_managed_memory_size=`awk "BEGIN { printf \"%.0f\n\", ${tm_heap_size_mb} * ${FLINK_TM_MEM_MANAGED_FRACTION} }"`
+            tm_heap_size_mb=$((tm_heap_size_mb - offheap_managed_memory_size))
+        fi
+    fi
+
+    echo ${tm_heap_size_mb}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 8431408..b16abc9 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -57,39 +57,10 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
 
     if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
 
-        TM_HEAP_SIZE=${FLINK_TM_HEAP}
+        TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
         # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
         TM_MAX_OFFHEAP_SIZE="8388607T"
 
-        if useOffHeapMemory; then
-            if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
-                # We split up the total memory in heap and off-heap memory
-                if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
-                    echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')."
-                    exit 1
-                fi
-                TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
-            else
-                # Bash only performs integer arithmetic so floating point computation is performed using awk
-                command -v awk >/dev/null 2>&1
-                if [[ $? -ne 0 ]]; then
-                    echo "[ERROR] Program 'awk' not found."
-                    echo "Please install 'awk' or define '${KEY_TASKM_MEM_MANAGED_SIZE}' instead of '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
-                    exit 1
-                fi
-                # We calculate the memory using a fraction of the total memory
-                if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_MEM_MANAGED_FRACTION}"` != "1" ]]; then
-                    echo "[ERROR] Configured TaskManager managed memory fraction '${FLINK_TM_MEM_MANAGED_FRACTION}' is not a valid value."
-                    echo "It must be between 0.0 and 1.0."
-                    echo "Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
-                    exit 1
-                fi
-                # recalculate the JVM heap memory by taking the off-heap ratio into account
-                OFFHEAP_MANAGED_MEMORY_SIZE=`awk "BEGIN { printf \"%.0f\n\", ${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION} }"`
-                TM_HEAP_SIZE=$((FLINK_TM_HEAP - OFFHEAP_MANAGED_MEMORY_SIZE))
-            fi
-        fi
-
         export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
 
     fi

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/bin/calcTMHeapSizeMB.sh b/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
new file mode 100755
index 0000000..3956643
--- /dev/null
+++ b/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Wrapper script to compare the TM heap size calculation of config.sh with Java
+USAGE="Usage: calcTMHeapSizeMB.sh <memTotal> <offHeap> <netBufFrac> <netBufMin> <netBufMax> <managedMemMB> <managedMemFrac>"
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+FLINK_TM_HEAP=$1
+FLINK_TM_OFFHEAP=$2
+FLINK_TM_NET_BUF_FRACTION=$3
+FLINK_TM_NET_BUF_MIN=$4
+FLINK_TM_NET_BUF_MAX=$5
+FLINK_TM_MEM_MANAGED_SIZE=$6
+FLINK_TM_MEM_MANAGED_FRACTION=$7
+
+if [[ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]]; then
+  echo "$USAGE"
+  exit 1
+fi
+
+FLINK_CONF_DIR=${bin}/../../main/resources
+. ${bin}/../../main/flink-bin/bin/config.sh
+
+calculateTaskManagerHeapSizeMB

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/test/bin/calcTMNetBufMem.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/bin/calcTMNetBufMem.sh b/flink-dist/src/test/bin/calcTMNetBufMem.sh
new file mode 100755
index 0000000..9948d9c
--- /dev/null
+++ b/flink-dist/src/test/bin/calcTMNetBufMem.sh
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Wrapper script to compare the TM heap size calculation of config.sh with Java
+USAGE="Usage: calcTMNetBufMem.sh <memTotal> <netBufFrac> <netBufMin> <netBufMax>"
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+FLINK_TM_HEAP=$1
+FLINK_TM_NET_BUF_FRACTION=$2
+FLINK_TM_NET_BUF_MIN=$3
+FLINK_TM_NET_BUF_MAX=$4
+
+if [[ -z "${FLINK_TM_NET_BUF_MAX}" ]]; then
+  echo "$USAGE"
+  exit 1
+fi
+
+FLINK_CONF_DIR=${bin}/../../main/resources
+. ${bin}/../../main/flink-bin/bin/config.sh
+
+calculateNetworkBufferMemory

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
new file mode 100644
index 0000000..11d8ec7
--- /dev/null
+++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation used by the bash script
+ * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
+ * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
+
+	/** Key that is used by <tt>config.sh</tt>. */
+	private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+	/**
+	 * Number of tests with random values.
+	 *
+	 * NOTE: calling the external test script is slow and thus low numbers are preferred for general
+	 * testing.
+	 */
+	private static final int NUM_RANDOM_TESTS = 20;
+
+	@Before
+	public void checkOperatingSystem() {
+		Assume.assumeTrue("This test checks shell scripts which are not available on Windows.",
+			!OperatingSystem.isWindows());
+	}
+
+	/**
+	 * Tests that {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)} has the same
+	 * result as the shell script.
+	 */
+	@Test
+	public void compareNetworkBufShellScriptWithJava() throws Exception {
+		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+		// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+		compareNetworkBufJavaVsScript(
+			getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
+
+		compareNetworkBufJavaVsScript(
+			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f);
+
+		compareNetworkBufJavaVsScript(
+			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
+
+		// some automated tests with random (but valid) values
+
+		Random ran = new Random();
+		for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+			// tolerate that values differ by 1% (due to different floating point precisions)
+			compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f);
+		}
+	}
+
+	/**
+	 * Tests that {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} has the same
+	 * result as the shell script.
+	 */
+	@Test
+	public void compareHeapSizeShellScriptWithJava() throws Exception {
+		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+		// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+		compareHeapSizeJavaVsScript(
+			getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
+
+		compareHeapSizeJavaVsScript(
+			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f);
+
+		compareHeapSizeJavaVsScript(
+			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
+
+		// some automated tests with random (but valid) values
+
+		Random ran = new Random();
+		for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+			// tolerate that values differ by 1% (due to different floating point precisions)
+			compareHeapSizeJavaVsScript(getRandomConfig(ran), 0.01f);
+		}
+	}
+
+	/**
+	 * Returns a flink configuration object with the given values.
+	 *
+	 * @param javaMemMB
+	 * 		total JVM memory to use (in megabytes)
+	 * @param useOffHeap
+	 * 		whether to use off-heap memory (<tt>true</tt>) or not (<tt>false</tt>)
+	 * @param netBufMemFrac
+	 * 		fraction of JVM memory to use for network buffers
+	 * @param netBufMemMin
+	 * 		minimum memory size for network buffers (in bytes)
+	 * @param netBufMemMax
+	 * 		maximum memory size for network buffers (in bytes)
+	 * @param managedMemSizeMB
+	 * 		amount of managed memory (in megabytes)
+	 * @param managedMemFrac
+	 * 		fraction of free memory to use for managed memory (if <tt>managedMemSizeMB</tt> is
+	 * 		<tt>-1</tt>)
+	 *
+	 * @return flink configuration
+	 */
+	private static Configuration getConfig(
+			final int javaMemMB, final boolean useOffHeap, final float netBufMemFrac,
+			final long netBufMemMin, final long netBufMemMax, final int managedMemSizeMB,
+			final float managedMemFrac) {
+
+		Configuration config = new Configuration();
+
+		config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB);
+		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap);
+
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, netBufMemMin);
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, netBufMemMax);
+
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB);
+		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, managedMemFrac);
+
+		return config;
+	}
+
+	/**
+	 * Returns a flink configuration object with random values (only those relevant to the tests in
+	 * this class.
+	 *
+	 * @param ran  random number generator
+	 *
+	 * @return flink configuration
+	 */
+	private static Configuration getRandomConfig(final Random ran) {
+
+		float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
+
+		// note: we are testing with integers only here to avoid overly complicated checks for
+		// overflowing or negative Long values - this should be enough for any practical scenario
+		// though
+		long min = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue() + ran.nextInt(Integer.MAX_VALUE);
+		long max = ran.nextInt(Integer.MAX_VALUE) + min;
+
+		int javaMemMB = Math.max((int) (max >> 20), ran.nextInt(Integer.MAX_VALUE)) + 1;
+		boolean useOffHeap = ran.nextBoolean();
+
+		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+		if (ran.nextBoolean()) {
+			// use fixed-size managed memory
+			Configuration config = getConfig(javaMemMB, useOffHeap, frac, min, max, managedMemSize, managedMemFrac);
+			long totalJavaMemorySize = ((long) javaMemMB) << 20; // megabytes to bytes
+			final int networkBufMB =
+				(int) (TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20);
+			// max (exclusive): total - netbuf
+			managedMemSize = Math.min(javaMemMB - networkBufMB - 1, ran.nextInt(Integer.MAX_VALUE));
+		} else {
+			// use fraction of given memory
+			managedMemFrac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
+		}
+
+		return getConfig(javaMemMB, useOffHeap, frac, min, max, managedMemSize, managedMemFrac);
+	}
+
+	// Helper functions
+
+	/**
+	 * Calculates the heap size via
+	 * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} and the shell script
+	 * and verifies that these are equal.
+	 *
+	 * @param config     flink configuration
+	 * @param tolerance  tolerate values that are off by this factor (0.01 = 1%)
+	 */
+	private void compareNetworkBufJavaVsScript(final Configuration config, final float tolerance)
+			throws IOException {
+
+		final long totalJavaMemorySizeMB = config.getLong(KEY_TASKM_MEM_SIZE, 0L);
+
+		long javaNetworkBufMem = TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySizeMB << 20, config);
+
+		String[] command = {"src/test/bin/calcTMNetBufMem.sh",
+			String.valueOf(totalJavaMemorySizeMB),
+			String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
+			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)),
+			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX))};
+		String scriptOutput = executeScript(command);
+
+		long absoluteTolerance = (long) (javaNetworkBufMem * tolerance);
+		if (absoluteTolerance < 1) {
+			assertEquals(
+				"Different network buffer memory sizes with configuration: " + config.toString(),
+				String.valueOf(javaNetworkBufMem), scriptOutput);
+		} else {
+			Long scriptNetworkBufMem = Long.valueOf(scriptOutput);
+			assertThat(
+				"Different network buffer memory sizes (Java: " + javaNetworkBufMem + ", Script: " + scriptNetworkBufMem +
+					") with configuration: " + config.toString(), scriptNetworkBufMem,
+				allOf(greaterThanOrEqualTo(javaNetworkBufMem - absoluteTolerance),
+					lessThanOrEqualTo(javaNetworkBufMem + absoluteTolerance)));
+		}
+	}
+
+	/**
+	 * Calculates the heap size via
+	 * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} and the shell script
+	 * and verifies that these are equal.
+	 *
+	 * @param config     flink configuration
+	 * @param tolerance  tolerate values that are off by this factor (0.01 = 1%)
+	 */
+	private void compareHeapSizeJavaVsScript(final Configuration config, float tolerance)
+			throws IOException {
+
+		final long totalJavaMemorySizeMB = config.getLong(KEY_TASKM_MEM_SIZE, 0L);
+
+		long javaHeapSizeMB = TaskManagerServices.calculateHeapSizeMB(totalJavaMemorySizeMB, config);
+
+		String[] command = {"src/test/bin/calcTMHeapSizeMB.sh",
+			String.valueOf(totalJavaMemorySizeMB),
+			String.valueOf(config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)),
+			String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
+			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)),
+			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)),
+			String.valueOf(config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE)),
+			String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
+		String scriptOutput = executeScript(command);
+
+		long absoluteTolerance = (long) (javaHeapSizeMB * tolerance);
+		if (absoluteTolerance < 1) {
+			assertEquals("Different heap sizes with configuration: " + config.toString(),
+				String.valueOf(javaHeapSizeMB), scriptOutput);
+		} else {
+			Long scriptHeapSizeMB = Long.valueOf(scriptOutput);
+			assertThat(
+				"Different heap sizes (Java: " + javaHeapSizeMB + ", Script: " + scriptHeapSizeMB +
+					") with configuration: " + config.toString(), scriptHeapSizeMB,
+				allOf(greaterThanOrEqualTo(javaHeapSizeMB - absoluteTolerance),
+					lessThanOrEqualTo(javaHeapSizeMB + absoluteTolerance)));
+		}
+	}
+
+	/**
+	 * Executes the given shell script wrapper and returns its output.
+	 *
+	 * @param command  command to run
+	 *
+	 * @return raw script output
+	 */
+	private String executeScript(final String[] command) throws IOException {
+		ProcessBuilder pb = new ProcessBuilder(command);
+		pb.redirectErrorStream(true);
+		Process process = pb.start();
+		BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+		StringBuilder sb = new StringBuilder();
+		String s;
+		while ((s = reader.readLine()) != null) {
+			sb.append(s);
+		}
+		return sb.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/resources/log4j-test.properties b/flink-dist/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..9f24837
--- /dev/null
+++ b/flink-dist/src/test/resources/log4j-test.properties
@@ -0,0 +1,35 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, console
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.dir}/${mvn.forkNumber}.log
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index e9d3cbd..ea508d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -309,7 +309,7 @@ public class BootstrapTools {
 	 * Get an instance of the dynamic properties option.
 	 *
 	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
-	 *  -Dfs.overwrite-files=true  -Dtaskmanager.network.numberOfBuffers=16368
+	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>
      */
 	public static Option newDynamicPropertiesOption() {
 		return new Option(DYNAMIC_PROPERTIES_OPT, true, "Dynamic properties");

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 8ff3c25..9d679cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -141,26 +141,10 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 		final long javaMemorySizeMB = containerMemoryMB - cutoff;
 
-		// (2) split the Java memory between heap and off-heap
+		// (2) split the remaining Java memory between heap and off-heap
+		final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
+		final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : javaMemorySizeMB - heapSizeMB; 
 
-		final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
-
-		final long heapSizeMB;
-		long offHeapSize = -1;
-		if (useOffHeap) {
-			offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
-
-			if (offHeapSize <= 0) {
-				double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
-
-				offHeapSize = (long) (fraction * javaMemorySizeMB);
-			}
-
-			heapSizeMB = javaMemorySizeMB - offHeapSize;
-		} else {
-			heapSizeMB = javaMemorySizeMB;
-		}
-		
 		// (3) obtain the additional environment variables from the configuration
 		final HashMap<String, String> envVars = new HashMap<>();
 		final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
@@ -172,7 +156,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 				envVars.put(envVarKey, config.getString(key, null));
 			}
 		}
-		
+
 		// done
 		return new ContaineredTaskManagerParameters(
 			containerMemoryMB, heapSizeMB, offHeapSize, numSlots, envVars);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index a36bdf4..1eb44c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -194,12 +194,15 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
 				throw new IOException(String.format("Insufficient number of network buffers: " +
 								"required %d, but only %d available. The total number of network " +
-								"buffers is currently set to %d. You can increase this " +
-								"number by setting the configuration key '%s'.",
+								"buffers is currently set to %d of %d bytes each. You can increase this " +
+								"number by setting the configuration keys '%s', '%s', and '%s'.",
 						numRequiredBuffers,
 						totalNumberOfMemorySegments - numTotalRequiredBuffers,
 						totalNumberOfMemorySegments,
-						TaskManagerOptions.NETWORK_NUM_BUFFERS.key()));
+						memorySegmentSize,
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
 			}
 
 			this.numTotalRequiredBuffers += numRequiredBuffers;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 11c6d16..654d528 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -52,8 +52,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * <p>Since the network buffer pool size for outgoing partitions is usually
  * quite small, e.g. via the {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL}
  * and {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE} parameters
- * for bounded channels or from the default value of
- * {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}, most spillable partitions
+ * for bounded channels or from the default values of
+ * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+ * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}, and
+ * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, most spillable partitions
  * will be spilled for real-world data sets.
  */
 class SpillableSubpartition extends ResultSubpartition {

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 823b3f2..b8d6bbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -190,7 +191,7 @@ public class MiniClusterConfiguration {
 	 * 3. Distribute the available free memory equally among all components (JMs, RMs and TMs) and
 	 * calculate the managed memory from the share of memory for a single task manager.
 	 *
-	 * @return
+	 * @return size of managed memory per task manager (in megabytes)
 	 */
 	private long getOrCalculateManagedMemoryPerTaskManager() {
 		if (managedMemoryPerTaskManager == -1) {
@@ -206,9 +207,6 @@ public class MiniClusterConfiguration {
 				// share the available memory among all running components
 
 				float memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
-				long networkBuffersMemory =
-					(long) config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS) *
-						(long) config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
 
 				long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
 
@@ -217,12 +215,13 @@ public class MiniClusterConfiguration {
 				long memoryPerComponent = freeMemory / (numTaskManagers + numResourceManagers + numJobManagers);
 
 				// subtract the network buffer memory
+				long networkBuffersMemory = TaskManagerServices.calculateNetworkBufferMemory(memoryPerComponent, config);
 				long memoryMinusNetworkBuffers = memoryPerComponent - networkBuffersMemory;
 
 				// calculate the managed memory size
 				long managedMemoryBytes = (long) (memoryMinusNetworkBuffers * memoryFraction);
 
-				return managedMemoryBytes >>> 20;
+				return managedMemoryBytes >> 20; // bytes to megabytes
 			} else {
 				return memorySize;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e3c8345..ecf81d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -46,7 +48,6 @@ import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -256,9 +257,11 @@ public class TaskManagerServices {
 			}
 			memorySize = configuredMemory << 20; // megabytes to bytes
 		} else {
+			// similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)
 			float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
 
 			if (memType == MemoryType.HEAP) {
+				// network buffers already allocated -> use memoryFraction of the remaining heap:
 				long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction);
 				if (preAllocateMemory) {
 					LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
@@ -269,7 +272,10 @@ public class TaskManagerServices {
 				}
 				memorySize = relativeMemSize;
 			} else if (memType == MemoryType.OFF_HEAP) {
-				// The maximum heap memory has been adjusted according to the fraction
+				// The maximum heap memory has been adjusted according to the fraction (see
+				// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
+				// maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * memoryFraction = jvmHeapNoNet * (1 - memoryFraction)
+				// directMemorySize = jvmHeapNoNet * memoryFraction
 				long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
 				long directMemorySize = (long) (maxMemory / (1.0 - memoryFraction) * memoryFraction);
 				if (preAllocateMemory) {
@@ -321,9 +327,19 @@ public class TaskManagerServices {
 
 		NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();
 
+		final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration);
+		int segmentSize = networkEnvironmentConfiguration.networkBufferSize();
+
+		// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
+		final long numNetBuffersLong = networkBuf / segmentSize;
+		if (numNetBuffersLong > Integer.MAX_VALUE) {
+			throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf
+				+ ") corresponds to more than MAX_INT pages.");
+		}
+
 		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
-			networkEnvironmentConfiguration.numNetworkBuffers(),
-			networkEnvironmentConfiguration.networkBufferSize(),
+			(int) numNetBuffersLong,
+			segmentSize,
 			networkEnvironmentConfiguration.memoryType());
 
 		ConnectionManager connectionManager;
@@ -376,6 +392,217 @@ public class TaskManagerServices {
 	}
 
 	/**
+	 * Calculates the amount of memory used for network buffers based on the total memory to use and
+	 * the according configuration parameters.
+	 *
+	 * The following configuration parameters are involved:
+	 * <ul>
+	 *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
+	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
+	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
+	 *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
+	 * </ul>.
+	 *
+	 * @param totalJavaMemorySize
+	 * 		overall available memory to use (heap and off-heap, in bytes)
+	 * @param config
+	 * 		configuration object
+	 *
+	 * @return memory to use for network buffers (in bytes)
+	 */
+	@SuppressWarnings("deprecation")
+	public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
+		Preconditions.checkArgument(totalJavaMemorySize > 0);
+
+		int segmentSize = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
+
+		final long networkBufBytes;
+		if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) {
+			// new configuration based on fractions of available memory with selectable min and max
+			float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+			long networkBufMin = config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN);
+			long networkBufMax = config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX);
+
+			TaskManagerServicesConfiguration
+				.checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax);
+
+			networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
+				(long) (networkBufFraction * totalJavaMemorySize)));
+
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(networkBufBytes < totalJavaMemorySize,
+					"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
+					"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+					"Network buffer memory size too large: " + networkBufBytes + " >= " +
+						totalJavaMemorySize + " (total JVM memory size)");
+		} else {
+			// use old (deprecated) network buffers parameter
+			int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+			networkBufBytes = (long) numNetworkBuffers * (long) segmentSize;
+
+			TaskManagerServicesConfiguration.checkNetworkConfigOld(numNetworkBuffers);
+
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(networkBufBytes < totalJavaMemorySize,
+					networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+					"Network buffer memory size too large: " + networkBufBytes + " >= " +
+						totalJavaMemorySize + " (total JVM memory size)");
+		}
+
+		return networkBufBytes;
+	}
+
+	/**
+	 * Calculates the amount of memory used for network buffers inside the current JVM instance
+	 * based on the available heap or the max heap size and the according configuration parameters.
+	 *
+	 * For containers or when started via scripts, if started with a memory limit and set to use
+	 * off-heap memory, the maximum heap size for the JVM is adjusted accordingly and we are able
+	 * to extract the intended values from this.
+	 *
+	 * The following configuration parameters are involved:
+	 * <ul>
+	 *  <li>{@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},</li>
+	 *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
+	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
+	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
+	 *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
+	 * </ul>.
+	 *
+	 * @param tmConfig task manager services configuration object
+	 *
+	 * @return memory to use for network buffers (in bytes)
+	 */
+	public static long calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig) {
+		final NetworkEnvironmentConfiguration networkConfig = tmConfig.getNetworkConfig();
+
+		final float networkBufFraction = networkConfig.networkBufFraction();
+		final long networkBufMin = networkConfig.networkBufMin();
+		final long networkBufMax = networkConfig.networkBufMax();
+
+		if (networkBufMin == networkBufMax) {
+			// fixed network buffer pool size
+			return networkBufMin;
+		}
+
+		// relative network buffer pool size using the fraction
+
+		final MemoryType memType = networkConfig.memoryType();
+
+		final long networkBufBytes;
+		if (memType == MemoryType.HEAP) {
+			// use fraction parts of the available heap memory
+
+			final long relativeMemSize = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
+			networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
+				(long) (networkBufFraction * relativeMemSize)));
+
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(networkBufBytes < relativeMemSize,
+					"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
+					"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+					"Network buffer memory size too large: " + networkBufBytes + " >= " +
+						relativeMemSize + "(free JVM heap size)");
+		} else if (memType == MemoryType.OFF_HEAP) {
+			// The maximum heap memory has been adjusted accordingly as in
+			// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config))
+			// and we need to invert these calculations.
+
+			final long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
+
+			// check if a value has been configured
+			long configuredMemory = tmConfig.getConfiguredMemory() << 20; // megabytes to bytes
+
+			final long jvmHeapNoNet;
+			if (configuredMemory > 0) {
+				// The maximum heap memory has been adjusted according to configuredMemory, i.e.
+				// maxJvmHeap = jvmHeapNoNet - configuredMemory
+
+				jvmHeapNoNet = maxMemory + configuredMemory;
+			} else {
+				// The maximum heap memory has been adjusted according to the fraction, i.e.
+				// maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * managedFraction = jvmHeapNoNet * (1 - managedFraction)
+
+				final float managedFraction = tmConfig.getMemoryFraction();
+				jvmHeapNoNet = (long) (maxMemory / (1.0 - managedFraction));
+			}
+
+			// finally extract the network buffer memory size again from:
+			// jvmHeapNoNet = jvmHeap - networkBufBytes
+			//              = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
+			networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
+				(long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction)));
+
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(networkBufBytes < maxMemory,
+					"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
+					"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+					"Network buffer memory size too large: " + networkBufBytes + " >= " +
+						maxMemory + "(maximum JVM heap size)");
+		} else {
+			throw new RuntimeException("No supported memory type detected.");
+		}
+
+		return networkBufBytes;
+	}
+
+	/**
+	 * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
+	 * based on the total memory to use and the given configuration parameters.
+	 *
+	 * @param totalJavaMemorySizeMB
+	 * 		overall available memory to use (heap and off-heap)
+	 * @param config
+	 * 		configuration object
+	 *
+	 * @return heap memory to use (in megabytes)
+	 */
+	public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
+		Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
+
+		final long totalJavaMemorySize = totalJavaMemorySizeMB << 20; // megabytes to bytes
+
+		// split the available Java memory between heap and off-heap
+
+		final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
+
+		final long heapSizeMB;
+		if (useOffHeap) {
+
+			// subtract the Java memory used for network buffers
+			final long networkBufMB = calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20; // bytes to megabytes
+			final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
+
+			long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+
+			if (offHeapSize <= 0) {
+				// calculate off-heap section via fraction
+				double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+				offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
+			}
+
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
+					TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+					"Managed memory size too large for " + networkBufMB +
+						" MB network buffer memory and a total of " + totalJavaMemorySizeMB +
+						" MB JVM memory");
+
+			heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
+		} else {
+			heapSizeMB = totalJavaMemorySizeMB;
+		}
+
+		return heapSizeMB;
+	}
+
+	/**
 	 * Validates that all the directories denoted by the strings do actually exist, are proper
 	 * directories (not files), and are writable.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 366be34..3fee689 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.net.InetAddress;
@@ -47,6 +49,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * the io manager and the metric registry
  */
 public class TaskManagerServicesConfiguration {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServicesConfiguration.class);
 
 	private final InetAddress taskManagerAddress;
 
@@ -58,6 +61,11 @@ public class TaskManagerServicesConfiguration {
 
 	private final QueryableStateConfiguration queryableStateConfig;
 
+	/**
+	 * Managed memory (in megabytes).
+	 *
+	 * @see TaskManagerOptions#MANAGED_MEMORY_SIZE
+	 */
 	private final long configuredMemory;
 
 	private final boolean preAllocateMemory;
@@ -126,6 +134,13 @@ public class TaskManagerServicesConfiguration {
 		return memoryFraction;
 	}
 
+	/**
+	 * Returns the size of the managed memory (in megabytes), if configured.
+	 *
+	 * @return managed memory or a default value (currently <tt>-1</tt>) if not configured
+	 *
+	 * @see TaskManagerOptions#MANAGED_MEMORY_SIZE
+	 */
 	public long getConfiguredMemory() {
 		return configuredMemory;
 	}
@@ -228,6 +243,7 @@ public class TaskManagerServicesConfiguration {
 	 * @param slots to start the task manager with
 	 * @return Network environment configuration
 	 */
+	@SuppressWarnings("deprecation")
 	private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
 		Configuration configuration,
 		boolean localTaskManagerCommunication,
@@ -245,11 +261,6 @@ public class TaskManagerServicesConfiguration {
 		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
 			"Number of task slots must be at least one.");
 
-		final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
-
-		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
-			TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), "");
-
 		final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
 
 		// check page size of for minimum size
@@ -284,6 +295,27 @@ public class TaskManagerServicesConfiguration {
 			}
 		}
 
+		// network buffer memory fraction
+
+		float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+		long networkBufMin = configuration.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN);
+		long networkBufMax = configuration.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX);
+		checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);
+
+		// fallback: number of network buffers
+		final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+		checkNetworkConfigOld(numNetworkBuffers);
+
+		if (!hasNewNetworkBufConf(configuration)) {
+			// map old config to new one:
+			networkBufMin = networkBufMax = numNetworkBuffers * pageSize;
+		} else {
+			if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+				LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
+					TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+			}
+		}
+
 		final NettyConfig nettyConfig;
 		if (!localTaskManagerCommunication) {
 			final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
@@ -317,7 +349,9 @@ public class TaskManagerServicesConfiguration {
 			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
 
 		return new NetworkEnvironmentConfiguration(
-			numNetworkBuffers,
+			networkBufFraction,
+			networkBufMin,
+			networkBufMax,
 			pageSize,
 			memType,
 			ioMode,
@@ -329,6 +363,69 @@ public class TaskManagerServicesConfiguration {
 	}
 
 	/**
+	 * Validates the (old) network buffer configuration.
+	 *
+	 * @param numNetworkBuffers		number of buffers used in the network stack
+	 *
+	 * @throws IllegalConfigurationException if the condition does not hold
+	 */
+	@SuppressWarnings("deprecation")
+	protected static void checkNetworkConfigOld(final int numNetworkBuffers) {
+		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+			TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+			"Must have at least one network buffer");
+	}
+
+	/**
+	 * Validates the (new) network buffer configuration.
+	 *
+	 * @param pageSize 				size of memory buffers
+	 * @param networkBufFraction	fraction of JVM memory to use for network buffers
+	 * @param networkBufMin 		minimum memory size for network buffers (in bytes)
+	 * @param networkBufMax 		maximum memory size for network buffers (in bytes)
+	 *
+	 * @throws IllegalConfigurationException if the condition does not hold
+	 */
+	protected static void checkNetworkBufferConfig(
+			final int pageSize, final float networkBufFraction, final long networkBufMin,
+			final long networkBufMax) throws IllegalConfigurationException {
+
+		checkConfigParameter(networkBufFraction > 0.0f && networkBufFraction < 1.0f, networkBufFraction,
+			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+			"Network buffer memory fraction of the free memory must be between 0.0 and 1.0");
+
+		checkConfigParameter(networkBufMin >= pageSize, networkBufMin,
+			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+			"Minimum memory for network buffers must allow at least one network " +
+				"buffer with respect to the memory segment size");
+
+		checkConfigParameter(networkBufMax >= pageSize, networkBufMax,
+			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
+			"Maximum memory for network buffers must allow at least one network " +
+				"buffer with respect to the memory segment size");
+
+		checkConfigParameter(networkBufMax >= networkBufMin, networkBufMax,
+			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
+			"Maximum memory for network buffers must not be smaller than minimum memory (" +
+				TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ": " + networkBufMin + ")");
+	}
+
+	/**
+	 * Returns whether the new network buffer memory configuration is present in the configuration
+	 * object, i.e. at least one new parameter is given or the old one is not present.
+	 *
+	 * @param config configuration object
+	 * @return <tt>true</tt> if the new configuration method is used, <tt>false</tt> otherwise
+	 */
+	@SuppressWarnings("deprecation")
+	public static boolean hasNewNetworkBufConf(final Configuration config) {
+		return config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
+			config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
+			config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX) ||
+			!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+	}
+
+	/**
 	 * Creates the {@link QueryableStateConfiguration} from the given Configuration.
 	 */
 	private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) {
@@ -353,8 +450,11 @@ public class TaskManagerServicesConfiguration {
 	 * @param parameter         The parameter value. Will be shown in the exception message.
 	 * @param name              The name of the config parameter. Will be shown in the exception message.
 	 * @param errorMessage  The optional custom error message to append to the exception message.
+	 *
+	 * @throws IllegalConfigurationException if the condition does not hold
 	 */
-	private static void checkConfigParameter(boolean condition, Object parameter, String name, String errorMessage) {
+	static void checkConfigParameter(boolean condition, Object parameter, String name, String errorMessage)
+			throws IllegalConfigurationException {
 		if (!condition) {
 			throw new IllegalConfigurationException("Invalid configuration value for " + 
 					name + " : " + parameter + " - " + errorMessage);


[08/12] flink git commit: [FLINK-6470] [core] Add a utility to parse memory sizes

Posted by se...@apache.org.
[FLINK-6470] [core] Add a utility to parse memory sizes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50b8dda3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50b8dda3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50b8dda3

Branch: refs/heads/master
Commit: 50b8dda37d5194297e4a6e41460fbc13e67a393b
Parents: aed3b80
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 5 13:41:29 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:41:53 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/configuration/MemorySize.java  | 251 +++++++++++++++++++
 .../flink/configuration/MemorySizeTest.java     | 204 +++++++++++++++
 2 files changed, 455 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50b8dda3/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java b/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
new file mode 100644
index 0000000..c34c721
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Locale;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different units.
+ * 
+ * <h2>Parsing</h2>
+ * 
+ * The size can be parsed from a text expression. If the expression is a pure number,
+ * the value will be interpreted as bytes.
+ * 
+ * <p>To make larger values more compact, the common size suffixes are supported:
+ * 
+ * <ul>
+ *     <li>q or 1b or 1bytes (bytes)
+ *     <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
+ *     <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
+ *     <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
+ *     <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
+ * </ul>
+ */
+@PublicEvolving
+public class MemorySize implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String[] BYTES_UNITS = { "b", "bytes" };
+
+	private static final String[] KILO_BYTES_UNITS = { "k", "kb", "kibibytes" };
+
+	private static final String[] MEGA_BYTES_UNITS = { "m", "mb", "mebibytes" };
+
+	private static final String[] GIGA_BYTES_UNITS = { "g", "gb", "gibibytes" };
+
+	private static final String[] TERA_BYTES_UNITS = { "t", "tb", "tebibytes" };
+
+	private static final String ALL_UNITS = concatenateUnits(
+			BYTES_UNITS, KILO_BYTES_UNITS, MEGA_BYTES_UNITS, GIGA_BYTES_UNITS, TERA_BYTES_UNITS);
+
+	// ------------------------------------------------------------------------
+
+	/** The memory size, in bytes */
+	private final long bytes;
+
+	/**
+	 * Constructs a new MemorySize.
+	 * 
+	 * @param bytes The size, in bytes. Must be zero or larger.
+	 */
+	public MemorySize(long bytes) {
+		checkArgument(bytes >= 0, "bytes must be >= 0");
+		this.bytes = bytes;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the memory size in bytes.
+	 */
+	public long getBytes() {
+		return bytes;
+	}
+
+	/**
+	 * Gets the memory size in Kibibytes (= 1024 bytes).
+	 */
+	public long getKibiBytes() {
+		return bytes >> 10;
+	}
+
+	/**
+	 * Gets the memory size in Mebibytes (= 1024 Kibibytes).
+	 */
+	public long getMebiBytes() {
+		return bytes >> 20;
+	}
+
+	/**
+	 * Gets the memory size in Gibibytes (= 1024 Mebibytes).
+	 */
+	public long getGibiBytes() {
+		return bytes >> 30;
+	}
+
+	/**
+	 * Gets the memory size in Tebibytes (= 1024 Gibibytes).
+	 */
+	public long getTebiBytes() {
+		return bytes >> 40;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return (int) (bytes ^ (bytes >>> 32));
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj == this || obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes;
+	}
+
+	@Override
+	public String toString() {
+		return bytes + " bytes";
+	}
+
+	// ------------------------------------------------------------------------
+	//  Parsing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Parses the given string as as MemorySize.
+	 * The supported expressions are listed under {@link MemorySize}.
+	 * 
+	 * @param text The string to parse
+	 * @return The parsed MemorySize
+	 * 
+	 * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+	 */
+	public static MemorySize parse(String text) throws IllegalArgumentException {
+		return new MemorySize(parseBytes(text));
+	}
+
+	/**
+	 * Parses the given string as bytes.
+	 * The supported expressions are listed under {@link MemorySize}.
+	 * 
+	 * @param text The string to parse
+	 * @return The parsed size, in bytes.
+	 * 
+	 * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+	 */
+	public static long parseBytes(String text) throws IllegalArgumentException {
+		checkNotNull(text, "text");
+
+		final String trimmed = text.trim();
+		checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
+
+		final int len = trimmed.length();
+		int pos = 0;
+
+		char current;
+		while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+			pos++;
+		}
+
+		final String number = trimmed.substring(0, pos);
+		final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+		if (number.isEmpty()) {
+			throw new NumberFormatException("text does not start with a number");
+		}
+
+		final long value;
+		try {
+			value = Long.parseLong(number); // this throws a NumberFormatException on overflow
+		}
+		catch (NumberFormatException e) {
+			throw new IllegalArgumentException("The value '" + number +
+					"' cannot be re represented as 64bit number (numeric overflow).");
+		}
+
+		final long multiplier;
+		if (unit.isEmpty()) {
+			multiplier = 1L;
+		}
+		else {
+			if (matchesAny(unit, BYTES_UNITS)) {
+				multiplier = 1L;
+			}
+			else if (matchesAny(unit, KILO_BYTES_UNITS)) {
+				multiplier = 1024L;
+			}
+			else if (matchesAny(unit, MEGA_BYTES_UNITS)) {
+				multiplier = 1024L * 1024L;
+			}
+			else if (matchesAny(unit, GIGA_BYTES_UNITS)) {
+				multiplier = 1024L * 1024L * 1024L;
+			}
+			else if (matchesAny(unit, TERA_BYTES_UNITS)) {
+				multiplier = 1024L * 1024L * 1024L * 1024L;
+			}
+			else {
+				throw new IllegalArgumentException("Memory size unit '" + unit + 
+						"' does not match any of the recognized units: " + ALL_UNITS);
+			}
+		}
+
+		try {
+			return Math.multiplyExact(value, multiplier);
+		}
+		catch (ArithmeticException e) {
+			throw new IllegalArgumentException("The value '" + text + 
+					"' cannot be re represented as 64bit number of bytes (numeric overflow).");
+		}
+	}
+
+	private static boolean matchesAny(String str, String[] variants) {
+		for (String s : variants) {
+			if (s.equals(str)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	private static String concatenateUnits(final String[]... allUnits) {
+		final StringBuilder builder = new StringBuilder(128);
+
+		for (String[] units : allUnits) {
+			builder.append('(');
+
+			for (String unit : units) {
+				builder.append(unit);
+				builder.append(" | ");
+			}
+
+			builder.setLength(builder.length() - 3);
+			builder.append(") / ");
+		}
+
+		builder.setLength(builder.length() - 3);
+		return builder.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50b8dda3/flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java b/flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java
new file mode 100644
index 0000000..dbdd96b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+/**
+ * Tests for the {@link MemorySize} class.
+ */
+public class MemorySizeTest {
+
+	@Test
+	public void testUnitConversion() {
+		final MemorySize zero = new MemorySize(0);
+		assertEquals(0, zero.getBytes());
+		assertEquals(0, zero.getKibiBytes());
+		assertEquals(0, zero.getMebiBytes());
+		assertEquals(0, zero.getGibiBytes());
+		assertEquals(0, zero.getTebiBytes());
+
+		final MemorySize bytes = new MemorySize(955);
+		assertEquals(955, bytes.getBytes());
+		assertEquals(0, bytes.getKibiBytes());
+		assertEquals(0, bytes.getMebiBytes());
+		assertEquals(0, bytes.getGibiBytes());
+		assertEquals(0, bytes.getTebiBytes());
+
+		final MemorySize kilos = new MemorySize(18500);
+		assertEquals(18500, kilos.getBytes());
+		assertEquals(18, kilos.getKibiBytes());
+		assertEquals(0, kilos.getMebiBytes());
+		assertEquals(0, kilos.getGibiBytes());
+		assertEquals(0, kilos.getTebiBytes());
+
+		final MemorySize megas = new MemorySize(15 * 1024 * 1024);
+		assertEquals(15_728_640, megas.getBytes());
+		assertEquals(15_360, megas.getKibiBytes());
+		assertEquals(15, megas.getMebiBytes());
+		assertEquals(0, megas.getGibiBytes());
+		assertEquals(0, megas.getTebiBytes());
+
+		final MemorySize teras = new MemorySize(2L * 1024 * 1024 * 1024 * 1024 + 10);
+		assertEquals(2199023255562L, teras.getBytes());
+		assertEquals(2147483648L, teras.getKibiBytes());
+		assertEquals(2097152, teras.getMebiBytes());
+		assertEquals(2048, teras.getGibiBytes());
+		assertEquals(2, teras.getTebiBytes());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalid() {
+		new MemorySize(-1);
+	}
+
+	@Test
+	public void testStandardUtils() throws IOException {
+		final MemorySize size = new MemorySize(1234567890L);
+		final MemorySize cloned = CommonTestUtils.createCopySerializable(size);
+
+		assertEquals(size, cloned);
+		assertEquals(size.hashCode(), cloned.hashCode());
+		assertEquals(size.toString(), cloned.toString());
+	}
+
+	@Test
+	public void testParseBytes() {
+		assertEquals(1234, MemorySize.parseBytes("1234"));
+		assertEquals(1234, MemorySize.parseBytes("1234b"));
+		assertEquals(1234, MemorySize.parseBytes("1234 b"));
+		assertEquals(1234, MemorySize.parseBytes("1234bytes"));
+		assertEquals(1234, MemorySize.parseBytes("1234 bytes"));
+	}
+
+	@Test
+	public void testParseKibiBytes() {
+		assertEquals(667766, MemorySize.parse("667766k").getKibiBytes());
+		assertEquals(667766, MemorySize.parse("667766 k").getKibiBytes());
+		assertEquals(667766, MemorySize.parse("667766kb").getKibiBytes());
+		assertEquals(667766, MemorySize.parse("667766 kb").getKibiBytes());
+		assertEquals(667766, MemorySize.parse("667766kibibytes").getKibiBytes());
+		assertEquals(667766, MemorySize.parse("667766 kibibytes").getKibiBytes());
+	}
+
+	@Test
+	public void testParseMebiBytes() {
+		assertEquals(7657623, MemorySize.parse("7657623m").getMebiBytes());
+		assertEquals(7657623, MemorySize.parse("7657623 m").getMebiBytes());
+		assertEquals(7657623, MemorySize.parse("7657623mb").getMebiBytes());
+		assertEquals(7657623, MemorySize.parse("7657623 mb").getMebiBytes());
+		assertEquals(7657623, MemorySize.parse("7657623mebibytes").getMebiBytes());
+		assertEquals(7657623, MemorySize.parse("7657623 mebibytes").getMebiBytes());
+	}
+
+	@Test
+	public void testParseGibiBytes() {
+		assertEquals(987654, MemorySize.parse("987654g").getGibiBytes());
+		assertEquals(987654, MemorySize.parse("987654 g").getGibiBytes());
+		assertEquals(987654, MemorySize.parse("987654gb").getGibiBytes());
+		assertEquals(987654, MemorySize.parse("987654 gb").getGibiBytes());
+		assertEquals(987654, MemorySize.parse("987654gibibytes").getGibiBytes());
+		assertEquals(987654, MemorySize.parse("987654 gibibytes").getGibiBytes());
+	}
+
+	@Test
+	public void testParseTebiBytes() {
+		assertEquals(1234567, MemorySize.parse("1234567t").getTebiBytes());
+		assertEquals(1234567, MemorySize.parse("1234567 t").getTebiBytes());
+		assertEquals(1234567, MemorySize.parse("1234567tb").getTebiBytes());
+		assertEquals(1234567, MemorySize.parse("1234567 tb").getTebiBytes());
+		assertEquals(1234567, MemorySize.parse("1234567tebibytes").getTebiBytes());
+		assertEquals(1234567, MemorySize.parse("1234567 tebibytes").getTebiBytes());
+	}
+
+	@Test
+	public void testUpperCase() {
+		assertEquals(1L, MemorySize.parse("1 B").getBytes());
+		assertEquals(1L, MemorySize.parse("1 K").getKibiBytes());
+		assertEquals(1L, MemorySize.parse("1 M").getMebiBytes());
+		assertEquals(1L, MemorySize.parse("1 G").getGibiBytes());
+		assertEquals(1L, MemorySize.parse("1 T").getTebiBytes());
+	}
+
+	@Test
+	public void testTrimBeforeParse() {
+		assertEquals(155L, MemorySize.parseBytes("      155      "));
+		assertEquals(155L, MemorySize.parseBytes("      155      bytes   "));
+	}
+
+	@Test
+	public void testParseInvalid() {
+		// null
+		try {
+			MemorySize.parseBytes(null);
+			fail("exception expected");
+		} catch (NullPointerException ignored) {}
+
+		// empty
+		try {
+			MemorySize.parseBytes("");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {}
+
+		// brank
+		try {
+			MemorySize.parseBytes("     ");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {}
+
+		// no number
+		try {
+			MemorySize.parseBytes("foobar or fubar or foo bazz");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {}
+
+		// wrong unit
+		try {
+			MemorySize.parseBytes("16 gjah");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {}
+
+		// multiple numbers
+		try {
+			MemorySize.parseBytes("16 16 17 18 bytes");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {}
+
+		// negavive number
+		try {
+			MemorySize.parseBytes("-100 bytes");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {}
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testParseNumberOverflow() {
+		MemorySize.parseBytes("100000000000000000000000000000000 bytes");
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testParseNumberTimeUnitOverflow() {
+		MemorySize.parseBytes("100000000000000 tb");
+	}
+}


[10/12] flink git commit: [FLINK-6447] [docs] Update aws/emr docs

Posted by se...@apache.org.
[FLINK-6447] [docs] Update aws/emr docs

This closes #3828


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c48f9bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c48f9bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c48f9bb

Branch: refs/heads/master
Commit: 6c48f9bb0e27b86f57b940aac67db12c17b4f5bc
Parents: 50b8dda
Author: David Anderson <da...@alpinegizmo.com>
Authored: Thu May 4 17:02:48 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:41:53 2017 +0200

----------------------------------------------------------------------
 docs/fig/flink-on-emr.png | Bin 103880 -> 0 bytes
 docs/setup/aws.md         |  21 +++++++++++++++------
 2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c48f9bb/docs/fig/flink-on-emr.png
----------------------------------------------------------------------
diff --git a/docs/fig/flink-on-emr.png b/docs/fig/flink-on-emr.png
deleted file mode 100644
index f71c004..0000000
Binary files a/docs/fig/flink-on-emr.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/6c48f9bb/docs/setup/aws.md
----------------------------------------------------------------------
diff --git a/docs/setup/aws.md b/docs/setup/aws.md
index cee5680..bc5b221 100644
--- a/docs/setup/aws.md
+++ b/docs/setup/aws.md
@@ -32,17 +32,26 @@ Amazon Web Services offers cloud computing services on which you can run Flink.
 
 [Amazon Elastic MapReduce](https://aws.amazon.com/elasticmapreduce/) (Amazon EMR) is a web service that makes it easy to  quickly setup a Hadoop cluster. This is the **recommended way** to run Flink on AWS as it takes care of setting up everything.
 
-### Create EMR Cluster
+### Standard EMR Installation
 
-The EMR documentation contains [examples showing how to start an EMR cluster](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-gs-launch-sample-cluster.html). You can follow that guide and install any EMR release. You don't need to install *All Applications* part of the EMR release, but can stick to *Core Hadoop*:
+Flink is a supported application on Amazon EMR. [Amazon's documentation](http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html)
+describes configuring Flink, creating and monitoring a cluster, and working with jobs.
 
-<img src="{{ site.baseurl }}/fig/flink-on-emr.png" class="img-responsive">
+### Custom EMR Installation
 
-When creating your cluster, make sure to setup [IAM roles](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-iam-roles.html) allowing you to access your S3 buckets if required.
+Amazon EMR services are regularly updated to new releases but a version of Flink which is not available
+can be manually installed in a stock EMR cluster.
 
-{% top %}
+**Create EMR Cluster**
+
+The EMR documentation contains [examples showing how to start an EMR cluster](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-gs-launch-sample-cluster.html). You can follow that guide and install any EMR release. You don't need to install the *All Applications* part of the EMR release, but can stick to *Core Hadoop*.
+
+{% warn Note %}
+Access to S3 buckets requires
+[configuration of IAM roles](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-iam-roles.html)
+when creating an EMR cluster.
 
-### Install Flink on EMR Cluster
+**Install Flink on EMR Cluster**
 
 After creating your cluster, you can [connect to the master node](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-connect-master-node.html) and install Flink:
 


[02/12] flink git commit: [FLINK-6401] [rocksdb] Harden RocksDB performance test

Posted by se...@apache.org.
[FLINK-6401] [rocksdb] Harden RocksDB performance test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e1b48ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e1b48ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e1b48ec

Branch: refs/heads/master
Commit: 4e1b48ec33c084c98ef68a126736c6628f6b3fa5
Parents: 179c64e
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 4 12:47:47 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 14:31:43 2017 +0200

----------------------------------------------------------------------
 .../state/benchmark/RocksDBPerformanceTest.java   | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e1b48ec/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index 011703e..7147583 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Rule;
@@ -26,6 +28,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import org.rocksdb.CompactionStyle;
+import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksIterator;
@@ -46,9 +49,16 @@ public class RocksDBPerformanceTest extends TestLogger {
 	@Rule
 	public final TemporaryFolder TMP = new TemporaryFolder();
 
+	@Rule
+	public final RetryRule retry = new RetryRule();
+
 	@Test(timeout = 2000)
+	@RetryOnFailure(times = 3)
 	public void testRocksDbMergePerformance() throws Exception {
-		final File rocksDir = TMP.newFolder("rdb");
+		final File rocksDir = TMP.newFolder();
+
+		// ensure the RocksDB library is loaded to a distinct location each retry
+		NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
 
 		final String key = "key";
 		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
@@ -121,8 +131,12 @@ public class RocksDBPerformanceTest extends TestLogger {
 	}
 
 	@Test(timeout = 2000)
+	@RetryOnFailure(times = 3)
 	public void testRocksDbRangeGetPerformance() throws Exception {
-		final File rocksDir = TMP.newFolder("rdb");
+		final File rocksDir = TMP.newFolder();
+
+		// ensure the RocksDB library is loaded to a distinct location each retry
+		NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
 
 		final String key = "key";
 		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";


[11/12] flink git commit: [hotfix] [runtime] Migrate NetworkEnvironmentConfiguration to Java

Posted by se...@apache.org.
[hotfix] [runtime] Migrate NetworkEnvironmentConfiguration to Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/710c08b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/710c08b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/710c08b8

Branch: refs/heads/master
Commit: 710c08b8a6e3b8888308679cf3c16761cffcae9c
Parents: 606c592
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 5 16:40:08 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:41:53 2017 +0200

----------------------------------------------------------------------
 .../taskexecutor/TaskManagerServices.java       |   2 +-
 .../NetworkEnvironmentConfiguration.java        | 206 +++++++++++++++++++
 .../NetworkEnvironmentConfiguration.scala       |  36 ----
 ...askManagerComponentsStartupShutdownTest.java |   2 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   2 +-
 5 files changed, 209 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ecf81d9..86a2fdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -388,7 +388,7 @@ public class TaskManagerServices {
 			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
 			networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
 			networkEnvironmentConfiguration.networkBuffersPerChannel(),
-			networkEnvironmentConfiguration.extraNetworkBuffersPerGate());
+			networkEnvironmentConfiguration.floatingNetworkBuffersPerGate());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
new file mode 100644
index 0000000..193fd90
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+
+import javax.annotation.Nullable;
+
+/**
+ * Configuration object for the network stack.
+ */
+public class NetworkEnvironmentConfiguration {
+
+	private final float networkBufFraction;
+
+	private final long networkBufMin;
+
+	private final long networkBufMax;
+
+	private final int networkBufferSize;
+
+	private final MemoryType memoryType;
+
+	private final IOMode ioMode;
+
+	private final int partitionRequestInitialBackoff;
+
+	private final int partitionRequestMaxBackoff;
+
+	private final int networkBuffersPerChannel;
+
+	private final int floatingNetworkBuffersPerGate;
+
+	private final NettyConfig nettyConfig;
+
+	/**
+	 * Constructor for a setup with purely local communication (no netty).
+	 */
+	public NetworkEnvironmentConfiguration(
+			float networkBufFraction,
+			long networkBufMin,
+			long networkBufMax,
+			int networkBufferSize,
+			MemoryType memoryType,
+			IOMode ioMode,
+			int partitionRequestInitialBackoff,
+			int partitionRequestMaxBackoff,
+			int networkBuffersPerChannel,
+			int floatingNetworkBuffersPerGate) {
+
+		this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
+				memoryType, ioMode,
+				partitionRequestInitialBackoff, partitionRequestMaxBackoff,
+				networkBuffersPerChannel, floatingNetworkBuffersPerGate,
+				null);
+		
+	}
+
+	public NetworkEnvironmentConfiguration(
+			float networkBufFraction,
+			long networkBufMin,
+			long networkBufMax,
+			int networkBufferSize,
+			MemoryType memoryType,
+			IOMode ioMode,
+			int partitionRequestInitialBackoff,
+			int partitionRequestMaxBackoff,
+			int networkBuffersPerChannel,
+			int floatingNetworkBuffersPerGate,
+			@Nullable NettyConfig nettyConfig) {
+
+		this.networkBufFraction = networkBufFraction;
+		this.networkBufMin = networkBufMin;
+		this.networkBufMax = networkBufMax;
+		this.networkBufferSize = networkBufferSize;
+		this.memoryType = memoryType;
+		this.ioMode = ioMode;
+		this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
+		this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
+		this.networkBuffersPerChannel = networkBuffersPerChannel;
+		this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
+		this.nettyConfig = nettyConfig;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public float networkBufFraction() {
+		return networkBufFraction;
+	}
+
+	public long networkBufMin() {
+		return networkBufMin;
+	}
+
+	public long networkBufMax() {
+		return networkBufMax;
+	}
+
+	public int networkBufferSize() {
+		return networkBufferSize;
+	}
+
+	public MemoryType memoryType() {
+		return memoryType;
+	}
+
+	public IOMode ioMode() {
+		return ioMode;
+	}
+
+	public int partitionRequestInitialBackoff() {
+		return partitionRequestInitialBackoff;
+	}
+
+	public int partitionRequestMaxBackoff() {
+		return partitionRequestMaxBackoff;
+	}
+
+	public int networkBuffersPerChannel() {
+		return networkBuffersPerChannel;
+	}
+
+	public int floatingNetworkBuffersPerGate() {
+		return floatingNetworkBuffersPerGate;
+	}
+
+	public NettyConfig nettyConfig() {
+		return nettyConfig;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		int result = 1;
+		result = 31 * result + networkBufferSize;
+		result = 31 * result + memoryType.hashCode();
+		result = 31 * result + ioMode.hashCode();
+		result = 31 * result + partitionRequestInitialBackoff;
+		result = 31 * result + partitionRequestMaxBackoff;
+		result = 31 * result + networkBuffersPerChannel;
+		result = 31 * result + floatingNetworkBuffersPerGate;
+		result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
+		return result;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (this == obj) {
+			return true;
+		}
+		else if (obj == null || getClass() != obj.getClass()) {
+			return false;
+		}
+		else {
+			final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;
+
+			return this.networkBufFraction == that.networkBufFraction &&
+					this.networkBufMin == that.networkBufMin &&
+					this.networkBufMax == that.networkBufMax &&
+					this.networkBufferSize == that.networkBufferSize &&
+					this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
+					this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
+					this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
+					this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
+					this.memoryType == that.memoryType &&
+					this.ioMode == that.ioMode && 
+					(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "NetworkEnvironmentConfiguration{" +
+				"networkBufFraction=" + networkBufFraction +
+				", networkBufMin=" + networkBufMin +
+				", networkBufMax=" + networkBufMax +
+				", networkBufferSize=" + networkBufferSize +
+				", memoryType=" + memoryType +
+				", ioMode=" + ioMode +
+				", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
+				", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
+				", networkBuffersPerChannel=" + networkBuffersPerChannel +
+				", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
+				", nettyConfig=" + nettyConfig +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
deleted file mode 100644
index d74bb3b..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import org.apache.flink.core.memory.MemoryType
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
-import org.apache.flink.runtime.io.network.netty.NettyConfig
-
-case class NetworkEnvironmentConfiguration(
-    networkBufFraction: Float,
-    networkBufMin: Long,
-    networkBufMax: Long,
-    networkBufferSize: Int,
-    memoryType: MemoryType,
-    ioMode: IOMode,
-    partitionRequestInitialBackoff : Int,
-    partitionRequestMaxBackoff : Int,
-    networkBuffersPerChannel: Int,
-    extraNetworkBuffersPerGate: Int,
-    nettyConfig: NettyConfig = null)

http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 7837b27..2a4c036 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -145,7 +145,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 				netConf.partitionRequestInitialBackoff(),
 				netConf.partitionRequestMaxBackoff(),
 				netConf.networkBuffersPerChannel(),
-				netConf.extraNetworkBuffersPerGate());
+				netConf.floatingNetworkBuffersPerGate());
 
 			network.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/710c08b8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0f5afc0..e790ea8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -1047,7 +1047,7 @@ public class TaskManagerTest extends TestLogger {
 		assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
 		assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);
 		assertEquals(tmConfig.getNetworkConfig().networkBuffersPerChannel(), 10);
-		assertEquals(tmConfig.getNetworkConfig().extraNetworkBuffersPerGate(), 100);
+		assertEquals(tmConfig.getNetworkConfig().floatingNetworkBuffersPerGate(), 100);
 	}
 
 	/**


[12/12] flink git commit: [hotfix] [config] Harmonize configuration keys for TaskManager network settings.

Posted by se...@apache.org.
[hotfix] [config] Harmonize configuration keys for TaskManager network settings.

This preserves old config keys as deprecated keys where the key was already present
in an earlier release.

This also re-arranges config options to form logical sections in the file
and harmonized JavaDoc formatting style.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aed3b806
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aed3b806
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aed3b806

Branch: refs/heads/master
Commit: aed3b806461114e04e9d6c3c0f27bc75eefa8f47
Parents: 710c08b
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 4 21:40:26 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:41:53 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java | 100 ++++++++------
 .../runtime/io/network/netty/NettyConfig.java   | 136 ++++++++-----------
 2 files changed, 116 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aed3b806/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index c5063d1..8480045 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -29,22 +29,41 @@ import static org.apache.flink.configuration.ConfigOptions.key;
 public class TaskManagerOptions {
 
 	// ------------------------------------------------------------------------
-	//  TaskManager Options
+	//  General TaskManager Options
 	// ------------------------------------------------------------------------
 
 	// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
-	
-	/** Whether to kill the TaskManager when the task thread throws an OutOfMemoryError */
-	public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
-			key("taskmanager.jvm-exit-on-oom")
-			.defaultValue(false);
 
-	/** JVM heap size (in megabytes) for the TaskManagers */
+	/**
+	 * JVM heap size (in megabytes) for the TaskManagers
+	 */
 	public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
 			key("taskmanager.heap.mb")
 			.defaultValue(1024);
-		   
-	/** Size of memory buffers used by the network stack and the memory manager (in bytes). */
+
+	/**
+	 * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError
+	 */
+	public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
+			key("taskmanager.jvm-exit-on-oom")
+			.defaultValue(false);
+
+	/**
+	 * Whether the quarantine monitor for task managers shall be started. The quarantine monitor
+	 * shuts down the actor system if it detects that it has quarantined another actor system
+	 * or if it has been quarantined by another actor system.
+	 */
+	public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
+			key("taskmanager.exit-on-fatal-akka-error")
+			.defaultValue(false);
+
+	// ------------------------------------------------------------------------
+	//  Managed Memory Options
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Size of memory buffers used by the network stack and the memory manager (in bytes).
+	 */
 	public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
 			key("taskmanager.memory.segment-size")
 			.defaultValue(32768);
@@ -73,7 +92,9 @@ public class TaskManagerOptions {
 			key("taskmanager.memory.off-heap")
 			.defaultValue(false);
 
-	/** Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */
+	/**
+	 * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
+	 */
 	public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
 			key("taskmanager.memory.preallocate")
 			.defaultValue(false);
@@ -94,53 +115,65 @@ public class TaskManagerOptions {
 			key("taskmanager.network.numberOfBuffers")
 			.defaultValue(2048);
 
-	/** Fraction of JVM memory to use for network buffers. */
+	/**
+	 * Fraction of JVM memory to use for network buffers.
+	 */
 	public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
 			key("taskmanager.network.memory.fraction")
 			.defaultValue(0.1f);
 
-	/** Minimum memory size for network buffers (in bytes) */
+	/**
+	 * Minimum memory size for network buffers (in bytes)
+	 */
 	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN =
 			key("taskmanager.network.memory.min")
 			.defaultValue(64L << 20); // 64 MB
 
-	/** Maximum memory size for network buffers (in bytes) */
+	/**
+	 * Maximum memory size for network buffers (in bytes)
+	 */
 	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX =
 			key("taskmanager.network.memory.max")
 			.defaultValue(1024L << 20); // 1 GB
 
-
-	/** Minimum backoff for partition requests of input channels. */
-	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
-			key("taskmanager.net.request-backoff.initial")
-			.defaultValue(100);
-
-	/** Maximum backoff for partition requests of input channels. */
-	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
-			key("taskmanager.net.request-backoff.max")
-			.defaultValue(10000);
-
-
 	/**
 	 * Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel).
 	 *
 	 * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization
 	 */
 	public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
-		key("taskmanager.net.memory.buffers-per-channel")
+			key("taskmanager.network.memory.buffers-per-channel")
 			.defaultValue(2);
 
-	/** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */
+	/**
+	 * Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate).
+	 */
 	public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
-		key("taskmanager.net.memory.extra-buffers-per-gate")
+			key("taskmanager.network.memory.floating-buffers-per-gate")
 			.defaultValue(8);
 
 	/**
+	 * Minimum backoff for partition requests of input channels.
+	 */
+	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
+			key("taskmanager.network.request-backoff.initial")
+			.defaultValue(100)
+			.withDeprecatedKeys("taskmanager.net.request-backoff.initial");
+
+	/**
+	 * Maximum backoff for partition requests of input channels.
+	 */
+	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
+			key("taskmanager.network.request-backoff.max")
+			.defaultValue(10000)
+			.withDeprecatedKeys("taskmanager.net.request-backoff.max");
+
+	/**
 	 * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
 	 * lengths.
 	 */
 	public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
-			key("taskmanager.net.detailed-metrics")
+			key("taskmanager.network.detailed-metrics")
 			.defaultValue(false);
 
 	// ------------------------------------------------------------------------
@@ -176,15 +209,6 @@ public class TaskManagerOptions {
 			key("task.checkpoint.alignment.max-size")
 			.defaultValue(-1L);
 
-	/**
-	 * Whether the quarantine monitor for task managers shall be started. The quarantine monitor
-	 * shuts down the actor system if it detects that it has quarantined another actor system
-	 * or if it has been quarantined by another actor system.
-	 */
-	public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
-		key("taskmanager.exit-on-fatal-akka-error")
-		.defaultValue(false);
-
 	// ------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/aed3b806/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index b9a1b90..e716a82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.slf4j.Logger;
@@ -38,19 +40,40 @@ public class NettyConfig {
 
 	// - Config keys ----------------------------------------------------------
 
-	public static final String NUM_ARENAS = "taskmanager.net.num-arenas";
-
-	public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads";
-
-	public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads";
-
-	public static final String CONNECT_BACKLOG = "taskmanager.net.server.backlog";
-
-	public static final String CLIENT_CONNECT_TIMEOUT_SECONDS = "taskmanager.net.client.connectTimeoutSec";
-
-	public static final String SEND_RECEIVE_BUFFER_SIZE = "taskmanager.net.sendReceiveBufferSize";
-
-	public static final String TRANSPORT_TYPE = "taskmanager.net.transport";
+	public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
+			.key("taskmanager.network.netty.num-arenas")
+			.defaultValue(-1)
+			.withDeprecatedKeys("taskmanager.net.num-arenas");
+
+	public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
+			.key("taskmanager.network.netty.server.numThreads")
+			.defaultValue(-1)
+			.withDeprecatedKeys("taskmanager.net.server.numThreads");
+
+	public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
+			.key("taskmanager.network.netty.client.numThreads")
+			.defaultValue(-1)
+			.withDeprecatedKeys("taskmanager.net.client.numThreads");
+
+	public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
+			.key("taskmanager.network.netty.server.backlog")
+			.defaultValue(0) // default: 0 => Netty's default
+			.withDeprecatedKeys("taskmanager.net.server.backlog");
+
+	public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
+			.key("taskmanager.network.netty.client.connectTimeoutSec")
+			.defaultValue(120) // default: 120s = 2min
+			.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec");
+
+	public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
+			.key("taskmanager.network.netty.sendReceiveBufferSize")
+			.defaultValue(0) // default: 0 => Netty's default
+			.withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize");
+
+	public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
+			.key("taskmanager.network.netty.transport")
+			.defaultValue("nio")
+			.withDeprecatedKeys("taskmanager.net.transport");
 
 	// ------------------------------------------------------------------------
 
@@ -112,100 +135,49 @@ public class NettyConfig {
 	}
 
 	// ------------------------------------------------------------------------
-	// Setters
-	// ------------------------------------------------------------------------
-
-	public NettyConfig setServerConnectBacklog(int connectBacklog) {
-		checkArgument(connectBacklog >= 0);
-		config.setInteger(CONNECT_BACKLOG, connectBacklog);
-
-		return this;
-	}
-
-	public NettyConfig setServerNumThreads(int numThreads) {
-		checkArgument(numThreads >= 0);
-		config.setInteger(NUM_THREADS_SERVER, numThreads);
-
-		return this;
-	}
-
-	public NettyConfig setClientNumThreads(int numThreads) {
-		checkArgument(numThreads >= 0);
-		config.setInteger(NUM_THREADS_CLIENT, numThreads);
-
-		return this;
-	}
-
-	public NettyConfig setClientConnectTimeoutSeconds(int connectTimeoutSeconds) {
-		checkArgument(connectTimeoutSeconds >= 0);
-		config.setInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, connectTimeoutSeconds);
-
-		return this;
-	}
-
-	public NettyConfig setSendAndReceiveBufferSize(int bufferSize) {
-		checkArgument(bufferSize >= 0);
-		config.setInteger(SEND_RECEIVE_BUFFER_SIZE, bufferSize);
-
-		return this;
-	}
-
-	public NettyConfig setTransportType(String transport) {
-		if (transport.equals("nio") || transport.equals("epoll") || transport.equals("auto")) {
-			config.setString(TRANSPORT_TYPE, transport);
-		}
-		else {
-			throw new IllegalArgumentException("Unknown transport type.");
-		}
-
-		return this;
-	}
-
-	// ------------------------------------------------------------------------
 	// Getters
 	// ------------------------------------------------------------------------
 
 	public int getServerConnectBacklog() {
-		// default: 0 => Netty's default
-		return config.getInteger(CONNECT_BACKLOG, 0);
+		return config.getInteger(CONNECT_BACKLOG);
 	}
 
 	public int getNumberOfArenas() {
 		// default: number of slots
-		return config.getInteger(NUM_ARENAS, numberOfSlots);
+		final int configValue = config.getInteger(NUM_ARENAS);
+		return configValue == -1 ? numberOfSlots : configValue;
 	}
 
 	public int getServerNumThreads() {
 		// default: number of task slots
-		return config.getInteger(NUM_THREADS_SERVER, numberOfSlots);
+		final int configValue = config.getInteger(NUM_THREADS_SERVER);
+		return configValue == -1 ? numberOfSlots : configValue;
 	}
 
 	public int getClientNumThreads() {
 		// default: number of task slots
-		return config.getInteger(NUM_THREADS_CLIENT, numberOfSlots);
+		final int configValue = config.getInteger(NUM_THREADS_CLIENT);
+		return configValue == -1 ? numberOfSlots : configValue;
 	}
 
 	public int getClientConnectTimeoutSeconds() {
-		// default: 120s = 2min
-		return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, 120);
+		return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
 	}
 
 	public int getSendAndReceiveBufferSize() {
-		// default: 0 => Netty's default
-		return config.getInteger(SEND_RECEIVE_BUFFER_SIZE, 0);
+		return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
 	}
 
 	public TransportType getTransportType() {
-		String transport = config.getString(TRANSPORT_TYPE, "nio");
-
-		if (transport.equals("nio")) {
-			return TransportType.NIO;
-		}
-		else if (transport.equals("epoll")) {
-			return TransportType.EPOLL;
-		}
-		else {
-			return TransportType.AUTO;
+		String transport = config.getString(TRANSPORT_TYPE);
+
+		switch (transport) {
+			case "nio":
+				return TransportType.NIO;
+			case "epoll":
+				return TransportType.EPOLL;
+			default:
+				return TransportType.AUTO;
 		}
 	}
 


[04/12] flink git commit: [hotfix] fix typo in taskmanager.sh usage string

Posted by se...@apache.org.
[hotfix] fix typo in taskmanager.sh usage string


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac724507
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac724507
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac724507

Branch: refs/heads/master
Commit: ac7245072326d3a60ed106816b4adc5bcc9651b1
Parents: 546d0aa
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Apr 11 15:29:41 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 14:31:44 2017 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/taskmanager.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac724507/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 3bac51f..8431408 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -18,7 +18,7 @@
 ################################################################################
 
 # Start/stop a Flink TaskManager.
-USAGE="Usage: taskmanager.sh start|start-foreground|stop|stop-all)"
+USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
 
 STARTSTOP=$1
 


[07/12] flink git commit: [FLINK-4545] [network] (followup) Replace awk lshift by multiplication

Posted by se...@apache.org.
[FLINK-4545] [network] (followup) Replace awk lshift by multiplication

'lshift(...)' is not defined by default in some commonly used awk versions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/606c5920
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/606c5920
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/606c5920

Branch: refs/heads/master
Commit: 606c592078799852bf4029b261d4c0c55bcff19b
Parents: 0bb49e5
Author: Stephan Ewen <se...@apache.org>
Authored: Sat May 6 19:39:49 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:40:37 2017 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/config.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/606c5920/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 7e3c1d4..d73b220 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -533,7 +533,7 @@ calculateNetworkBufferMemory() {
             exit 1
         fi
 
-        network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+        network_buffers_bytes=`awk "BEGIN { x = ${FLINK_TM_HEAP} * 1048576 * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
     fi
 
     # recalculate the JVM heap memory by taking the network buffers into account


[05/12] flink git commit: [FLINK-4545] [network] replace the network buffers parameter

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 3db91b7..8677307 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -363,11 +363,6 @@ class LocalFlinkMiniCluster(
     if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) ==
         TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
 
-      val bufferSize: Int = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE)
-
-      val bufferMem: Long = config.getInteger(
-        TaskManagerOptions.NETWORK_NUM_BUFFERS).toLong * bufferSize.toLong
-
       val numTaskManager = config.getInteger(
         ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
         ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
@@ -381,10 +376,10 @@ class LocalFlinkMiniCluster(
       // each TaskManagers and each JobManager
       memorySize /= numTaskManager + 1 // the +1 is the job manager
 
-      // for each TaskManager, subtract the memory needed for memory buffers
-      memorySize -= bufferMem
+      // for each TaskManager, subtract the memory needed for network memory buffers
+      memorySize -= TaskManagerServices.calculateNetworkBufferMemory(memorySize, config)
       memorySize = (memorySize * memoryFraction).toLong
-      memorySize >>>= 20 // bytes to megabytes
+      memorySize >>= 20 // bytes to megabytes
       config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memorySize)
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 4ecfe59..d74bb3b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -23,7 +23,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
 case class NetworkEnvironmentConfiguration(
-    numNetworkBuffers: Int,
+    networkBufFraction: Float,
+    networkBufMin: Long,
+    networkBufMax: Long,
     networkBufferSize: Int,
     memoryType: MemoryType,
     ioMode: IOMode,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
new file mode 100644
index 0000000..ddd6462
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit test for {@link TaskManagerServicesConfiguration}.
+ */
+public class TaskManagerServicesConfigurationTest {
+	/**
+	 * Verifies that {@link TaskManagerServicesConfiguration#hasNewNetworkBufConf(Configuration)}
+	 * returns the correct result for old configurations via
+	 * {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+	 */
+	@SuppressWarnings("deprecation")
+	@Test
+	public void hasNewNetworkBufConfOld() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+		assertFalse(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+	}
+
+	/**
+	 * Verifies that {@link TaskManagerServicesConfiguration#hasNewNetworkBufConf(Configuration)}
+	 * returns the correct result for new configurations via
+	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and {@link
+	 * TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+	 */
+	@Test
+	public void hasNewNetworkBufConfNew() throws Exception {
+		Configuration config = new Configuration();
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		// fully defined:
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 2048);
+
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		// partly defined:
+		config = new Configuration();
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		config = new Configuration();
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		config = new Configuration();
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+	}
+
+	/**
+	 * Verifies that {@link TaskManagerServicesConfiguration#hasNewNetworkBufConf(Configuration)}
+	 * returns the correct result for mixed old/new configurations.
+	 */
+	@SuppressWarnings("deprecation")
+	@Test
+	public void hasNewNetworkBufConfMixed() throws Exception {
+		Configuration config = new Configuration();
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+		assertFalse(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		// old + 1 new parameter = new:
+		Configuration config1 = config.clone();
+		config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
+
+		config1 = config.clone();
+		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
+
+		config1 = config.clone();
+		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
new file mode 100644
index 0000000..bf90634
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)} using old
+	 * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+	 */
+	@SuppressWarnings("deprecation")
+	@Test
+	public void calculateNetworkBufOld() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+		// note: actual network buffer memory size is independent of the totalJavaMemorySize
+		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+			TaskManagerServices.calculateNetworkBufferMemory(10L << 20, config));
+		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+			TaskManagerServices.calculateNetworkBufferMemory(64L << 20, config));
+
+		// test integer overflow in the memory size
+		int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
+		assertEquals(2L << 32, TaskManagerServices.calculateNetworkBufferMemory(2L << 33, config));
+	}
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)} using new
+	 * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+	 */
+	@Test
+	public void calculateNetworkBufNew() throws Exception {
+		Configuration config = new Configuration();
+
+		// (1) defaults
+		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((64L << 20 + 1), config));
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((10L << 30), config));
+
+		calculateNetworkBufNew(config);
+	}
+
+	/**
+	 * Helper to test {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)} with the
+	 * new configuration parameters.
+	 *
+	 * @param config configuration object
+	 */
+	private static void calculateNetworkBufNew(final Configuration config) {
+		// (2) fixed size memory
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
+
+		// note: actual network buffer memory size is independent of the totalJavaMemorySize
+		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBufferMemory(10L << 20, config));
+		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBufferMemory(64L << 20, config));
+		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBufferMemory(1L << 30, config));
+
+		// (3) random fraction, min, and max values
+		Random ran = new Random();
+		for (int i = 0; i < 1_000; ++i){
+			float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
+			config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
+
+			long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
+			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
+
+			long max = Math.max(min, ran.nextLong());
+			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
+
+			long javaMem = Math.max(max + 1, ran.nextLong());
+
+			final long networkBufMem = TaskManagerServices.calculateNetworkBufferMemory(javaMem, config);
+			assertTrue("Lower bound not met with configuration: " + config.toString(),
+				networkBufMem >= min);
+			assertTrue("Upper bound not met with configuration: " + config.toString(),
+				networkBufMem <= max);
+			if (networkBufMem > min && networkBufMem < max) {
+				assertEquals(
+					"Wrong network buffer memory size with configuration: " + config.toString(),
+					(long) (javaMem * frac), networkBufMem);
+			}
+		}
+	}
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)} using mixed
+	 * old/new configurations.
+	 */
+	@SuppressWarnings("deprecation")
+	@Test
+	public void calculateNetworkBufMixed() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+
+		// old + 1 new parameter = new:
+		Configuration config1 = config.clone();
+		config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		assertEquals(enforceBounds((long) (0.1f * (10L << 20)), defaultMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((64L << 20 + 1), config1));
+		assertEquals(enforceBounds((long) (0.1f * (10L << 30)), defaultMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((10L << 30), config1));
+
+		config1 = config.clone();
+		long newMin = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); // smallest value possible
+		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, newMin);
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), newMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((10L << 20), config1));
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), newMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((10L << 30), config1));
+
+		config1 = config.clone();
+		long newMax = Math.max(64L << 20 + 1, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue());
+		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, newMax);
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, newMax),
+			TaskManagerServices.calculateNetworkBufferMemory((64L << 20 + 1), config1));
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, newMax),
+			TaskManagerServices.calculateNetworkBufferMemory((10L << 30), config1));
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
+
+		// old + any new parameter = new:
+		calculateNetworkBufNew(config);
+	}
+
+	/**
+	 * Returns the value or the lower/upper bound in case the value is less/greater than the lower/upper bound, respectively.
+	 *
+	 * @param value value to inspec
+	 * @param lower lower bound
+	 * @param upper upper bound
+	 *
+	 * @return <tt>min(upper, max(lower, value))</tt>
+	 */
+	private static long enforceBounds(final long value, final long lower, final long upper) {
+		return Math.min(upper, Math.max(lower, value));
+	}
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(TaskManagerServicesConfiguration)}
+	 * using the same (manual) test cases as in {@link #calculateHeapSizeMB()}.
+	 */
+	@Test
+	public void calculateNetworkBufFromHeapSize() throws Exception {
+		PowerMockito.mockStatic(EnvironmentInformation.class);
+		// some defaults:
+		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB
+
+		TaskManagerServicesConfiguration tmConfig;
+
+		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
+			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+			0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
+		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
+		assertEquals(100L << 20, TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+
+		tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB
+		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+
+		tmConfig = getTmConfig(-1, 0.1f,
+			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB
+		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+	}
+
+	/**
+	 * Returns a task manager services configuration for the tests
+	 *
+	 * @param managedMemory         see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
+	 * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
+	 * @param networkBufFraction	see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
+	 * @param networkBufMin			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}
+	 * @param networkBufMax			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
+	 * @param memType				on-heap or off-heap
+	 *
+	 * @return configuration object
+	 */
+	private static TaskManagerServicesConfiguration getTmConfig(
+		final long managedMemory, final float managedMemoryFraction, float networkBufFraction,
+		long networkBufMin, long networkBufMax,
+		final MemoryType memType) {
+
+		final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
+			networkBufFraction,
+			networkBufMin,
+			networkBufMax,
+			TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
+			memType,
+			null,
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
+			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
+			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(),
+			null);
+
+		return new TaskManagerServicesConfiguration(
+			mock(InetAddress.class),
+			new String[] {},
+			networkConfig,
+			QueryableStateConfiguration.disabled(),
+			1,
+			managedMemory,
+			false,
+			managedMemoryFraction,
+			mock(MetricRegistryConfiguration.class),
+			0);
+	}
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} with some
+	 * manually calculated scenarios.
+	 */
+	@Test
+	public void calculateHeapSizeMB() throws Exception {
+		Configuration config = new Configuration();
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 64L << 20); // 64MB
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB
+
+		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
+		assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config));
+
+		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB
+		assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
+
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // use fraction of given memory
+		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
+		assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 973fddf..7837b27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -121,18 +121,21 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 				config,
 				false); // exit-jvm-on-fatal-error
 
+			final int networkBufNum = 32;
+			// note: the network buffer memory configured here is not actually used below but set
+			// accordingly to be consistent
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
-					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC,
+					0.1f, networkBufNum * BUFFER_SIZE, networkBufNum * BUFFER_SIZE, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC,
 					0, 0, 2, 8, null);
 
 			ResourceID taskManagerId = ResourceID.generate();
 			
 			final TaskManagerLocation connectionInfo = new TaskManagerLocation(taskManagerId, InetAddress.getLocalHost(), 10000);
 
-			final MemoryManager memManager = new MemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
+			final MemoryManager memManager = new MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
 			final NetworkEnvironment network = new NetworkEnvironment(
-				new NetworkBufferPool(netConf.numNetworkBuffers(), netConf.networkBufferSize(), netConf.memoryType()),
+				new NetworkBufferPool(32, netConf.networkBufferSize(), netConf.memoryType()),
 				new LocalConnectionManager(),
 				new ResultPartitionManager(),
 				new TaskEventDispatcher(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index dbef01f..fea2b79 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -103,6 +103,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
 		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
+		// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
 
 		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
index 557fa38..6ce8d17 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -66,7 +66,7 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 
 	/**
 	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
-	 *  -D fs.overwrite-files=true  -D taskmanager.network.numberOfBuffers=16368
+	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>
 	 */
 	private final Option DYNAMIC_PROPERTIES;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 9277d21..3d82132 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -119,7 +119,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 	/**
 	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
-	 *  -D fs.overwrite-files=true  -D taskmanager.network.numberOfBuffers=16368
+	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>
 	 */
 	private final Option DYNAMIC_PROPERTIES;