You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2017/08/09 14:39:33 UTC

[GitHub] flink pull request #4506: [FLINK-7400][cluster] fix off-heap limits set to c...

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/4506

    [FLINK-7400][cluster] fix off-heap limits set to conservatively in cluster environments

    ## What is the purpose of the change
    
    Inside `ContaineredTaskManagerParameters`, since #3648, the `offHeapSize` is set to the amount of memory Flink will use off-heap which will be set as the value for `-XX:MaxDirectMemorySize` in various cases, e.g. YARN or Mesos. This does not account for any off-heap use by other components than Flink, e.g. RocksDB, other libraries, or the JVM itself.
    
    Please note that this affects at least all batch programs with the following options set (which do not make much sense for streaming):
    ```
    taskmanager.memory.off-heap=true
    taskmanager.memory.size=<any value>
    taskmanager.memory.preallocate=true
    ```
    If, instead, `taskmanager.memory.fraction` is used, programs may be safe due to https://issues.apache.org/jira/browse/FLINK-7401 but the actual additional buffer that we get from that may be too small, especially if RocksDB or other libraries using off-heap memory are used.
    
    This PR adds the `cutoff` from the `containerized.heap-cutoff-ratio`/`containerized.heap-cutoff-min` configuration parameters to `offHeapSize` as implied by the description of these two options.
    
    ## Brief change log
    
    - include the cut-off memory (removed from the container memory size for further calculations) into the off-heap part
    - add a unit test verifying the bug fix in a YARN environment
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    - added `YARNSessionCapacitySchedulerITCase#perJobYarnClusterOffHeap()` test that validates that we have enough memory available and the bounds are not too strict
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes: memory calculations)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (JavaDocs)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-7400

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4506.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4506
    
----
commit 60d40cde20686b4b1b2d15dc838b15ed0cd994cc
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-09T09:53:03Z

    [FLINK-7400][cluster] fix cut-off memory not used for off-heap reserve as intended
    
    + fix description of `containerized.heap-cutoff-ratio`

commit 4135a223288608444d324da333cfdd70117c796d
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-09T14:16:31Z

    [FLINK-7400][yarn] add an integration test for yarn container memory restrictions using off-heap memory

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4506: [FLINK-7400][cluster] fix off-heap limits set to c...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4506#discussion_r146498833
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java ---
    @@ -143,7 +143,8 @@ public static ContaineredTaskManagerParameters create(
     
     		// (2) split the remaining Java memory between heap and off-heap
     		final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
    -		final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : javaMemorySizeMB - heapSizeMB; 
    +		// use the cut-off memory for off-heap (that was its intention)
    +		final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : containerMemoryMB - heapSizeMB;
    --- End diff --
    
    Do I understand it correctly that with this change, we basically add the safety margin (cut-off) to the amount of direct memory?


---

[GitHub] flink pull request #4506: [FLINK-7400][cluster] fix off-heap limits set to c...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4506#discussion_r146479686
  
    --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---
    @@ -132,6 +132,36 @@ public void perJobYarnCluster() {
     	}
     
     	/**
    +	 * Test per-job yarn cluster and memory calculations for off-heap use (see FLINK-7400).
    +	 *
    +	 * <p>This also tests the prefixed CliFrontend options for the YARN case
    +	 * We also test if the requested parallelism of 2 is passed through.
    +	 * The parallelism is requested at the YARN client (-ys).
    +	 */
    +	@Test
    +	public void perJobYarnClusterOffHeap() {
    +		LOG.info("Starting perJobYarnCluster()");
    +		addTestAppender(JobClient.class, Level.INFO);
    +		File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
    +		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
    +		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
    +				"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
    +				"-yn", "1",
    +				"-ys", "2", //test that the job is executed with a DOP of 2
    +				"-yjm", "768",
    +				"-ytm", "1024",
    +				"-yD", "taskmanager.memory.off-heap=true",
    +				"-yD", "taskmanager.memory.size=246", // this should fit!
    --- End diff --
    
    Could you add some explanation why this fits now and not before?


---

[GitHub] flink pull request #4506: [FLINK-7400][cluster] fix off-heap limits set to c...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4506#discussion_r147656471
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java ---
    @@ -143,7 +143,8 @@ public static ContaineredTaskManagerParameters create(
     
     		// (2) split the remaining Java memory between heap and off-heap
     		final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
    -		final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : javaMemorySizeMB - heapSizeMB; 
    +		// use the cut-off memory for off-heap (that was its intention)
    +		final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : containerMemoryMB - heapSizeMB;
    --- End diff --
    
    yes and as far as I can tell, this was its original intention


---

[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4506
  
    Thanks. Let me know once you've run the cluster tests @NicoK. Then I'll try to merge the PR.


---

[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4506
  
    Thanks for running the test @NicoK. Could you please rebase this PR onto the latest master. Once Travis passes all tests, I'll merge this PR then.


---

[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4506
  
    I just tested this on a real yarn cluster with `./bin/flink run -m yarn-cluster -yn 1 -ys 2 -yjm 768 -ytm 1024 -yD taskmanager.memory.off-heap=true -yD taskmanager.memory.size=260 -yD taskmanager.memory.preallocate=true ./examples/batch/WordCount.jar` (the equivalent of the test) and verified that it was failing on Flink 1.3.2 but working with the proposed fix.
    
    I also tested the counterpart without the three memory options, i.e. `./bin/flink run -m yarn-cluster -yn 1 -ys 2 -yjm 768 -ytm 1024 ./examples/batch/WordCount.jar` and verified it is working with the PR changes


---

[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4506
  
    @tillrohrmann can you have a look at this?


---

[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4506
  
    Travis passed and changes look good to me. Merging this PR.


---

[GitHub] flink pull request #4506: [FLINK-7400][cluster] fix off-heap limits set to c...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4506#discussion_r147657947
  
    --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---
    @@ -132,6 +132,36 @@ public void perJobYarnCluster() {
     	}
     
     	/**
    +	 * Test per-job yarn cluster and memory calculations for off-heap use (see FLINK-7400).
    +	 *
    +	 * <p>This also tests the prefixed CliFrontend options for the YARN case
    +	 * We also test if the requested parallelism of 2 is passed through.
    +	 * The parallelism is requested at the YARN client (-ys).
    +	 */
    +	@Test
    +	public void perJobYarnClusterOffHeap() {
    +		LOG.info("Starting perJobYarnCluster()");
    +		addTestAppender(JobClient.class, Level.INFO);
    +		File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
    +		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
    +		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
    +				"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
    +				"-yn", "1",
    +				"-ys", "2", //test that the job is executed with a DOP of 2
    +				"-yjm", "768",
    +				"-ytm", "1024",
    +				"-yD", "taskmanager.memory.off-heap=true",
    +				"-yD", "taskmanager.memory.size=246", // this should fit!
    +				"-yD", "taskmanager.memory.preallocate=true", exampleJarLocation.getAbsolutePath()},
    +				/* test succeeded after this string */
    +			"Job execution complete",
    +			/* prohibited strings: (we want to see "DataSink (...) (2/2) switched to FINISHED") */
    +			new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"},
    --- End diff --
    
    that was copied from the `perJobYarnCluster()` test this is based on and from my interpretation, I'd say, "`(1/1)` is prohibited but we should see `(2/2)`" is the correct way of interpreting this comment


---

[GitHub] flink pull request #4506: [FLINK-7400][cluster] fix off-heap limits set to c...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4506#discussion_r146479627
  
    --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---
    @@ -132,6 +132,36 @@ public void perJobYarnCluster() {
     	}
     
     	/**
    +	 * Test per-job yarn cluster and memory calculations for off-heap use (see FLINK-7400).
    +	 *
    +	 * <p>This also tests the prefixed CliFrontend options for the YARN case
    +	 * We also test if the requested parallelism of 2 is passed through.
    +	 * The parallelism is requested at the YARN client (-ys).
    +	 */
    +	@Test
    +	public void perJobYarnClusterOffHeap() {
    +		LOG.info("Starting perJobYarnCluster()");
    +		addTestAppender(JobClient.class, Level.INFO);
    +		File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
    +		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
    +		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
    +				"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
    +				"-yn", "1",
    +				"-ys", "2", //test that the job is executed with a DOP of 2
    +				"-yjm", "768",
    +				"-ytm", "1024",
    +				"-yD", "taskmanager.memory.off-heap=true",
    +				"-yD", "taskmanager.memory.size=246", // this should fit!
    +				"-yD", "taskmanager.memory.preallocate=true", exampleJarLocation.getAbsolutePath()},
    +				/* test succeeded after this string */
    +			"Job execution complete",
    +			/* prohibited strings: (we want to see "DataSink (...) (2/2) switched to FINISHED") */
    +			new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"},
    --- End diff --
    
    The comment above says something different than this line `(1/1)` vs. `(2/2)`


---

[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4506
  
    rebased successfully, waiting for Travis now...


---

[GitHub] flink pull request #4506: [FLINK-7400][cluster] fix off-heap limits set to c...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4506


---