You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2015/12/03 15:22:05 UTC

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/1433

    [FLINK-3073] Replace Streaming Mode by Memory Allocation Mode

    Before, streaming mode (either batch or streaming) would specify how
    memory is allocated on task managers.
    
    This introduces a new configuration value taskmanager.memory.allocation
    that can take values "lazy" or "eager". This controls how memory is
    allocated.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink stream-mode-default

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1433.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1433
    
----
commit bea4c112b7dbf0b7465b7419251173a742514967
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-12-01T15:13:55Z

    [FLINK-3073] Replace Streaming Mode by Memory Allocation Mode
    
    Before, streaming mode (either batch or streaming) would specify how
    memory is allocated on task managers.
    
    This introduces a new configuration value taskmanager.memory.allocation
    that can take values "lazy" or "eager". This controls how memory is
    allocated.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-162334879
  
    As followup to this, I would suggest two changes:
    
    1. Adjust the log message, to reflect that not 0.7 are either used immediately (eager allocation) or that the managed memory is capped at 0.7 * total memory. That makes it clear from reading the log that the memory is not pre-allocated.
    
    2. The web frontend should display for managed memory: (used / max). I have talked to users who were confused, thinking despite starting the system in streaming mode, it pre-allocated all the heap way, 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-163221930
  
    `preallocate` can be seen as one word, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1433#discussion_r46557510
  
    --- Diff: docs/setup/cluster_setup.md ---
    @@ -236,34 +236,18 @@ bin/start-cluster.sh
     
     To stop Flink, there is also a `stop-cluster.sh` script.
     
    -
    -### Starting Flink in the streaming mode
    -
    -~~~bash
    -bin/start-cluster-streaming.sh
    -~~~
    -
    -The streaming mode changes the startup behavior of Flink: The system is not 
    -bringing up the managed memory services with preallocated memory at the beginning.
    -Flink streaming is not using the managed memory employed by the batch operators.
    -By not starting these services with preallocated memory, streaming jobs can benefit
    -from more heap space being available.
    -
    -Note that you can still start batch jobs in the streaming mode. The memory manager
    -will then allocate memory segments from the Java heap as needed.
    -
     ### Optional: Adding JobManager/TaskManager instances to a cluster
     
     You can add both TaskManager or JobManager instances to your running cluster with the `bin/taskmanager.sh` and `bin/jobmanager.sh` scripts.
     
     #### Adding a TaskManager
     <pre>
    -bin/taskmanager.sh (start [batch|streaming])|stop|stop-all)
    +bin/taskmanager.sh start|stop|stop-all
     </pre>
     
     #### Adding a JobManager
     <pre>
    -bin/jobmanager.sh (start cluster [batch|streaming])|stop|stop-all)
    +bin/jobmanager.sh start|stop|stop-all
    --- End diff --
    
    Could you preserve the "cluster" or "local" parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-162464044
  
    @StephanEwen So you want to merge the two messages @rmetzger posted? I'm on it.
    
    I'll also change the option to a boolean `lazy_alloc`, no need for the String option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-163184249
  
    Looks good!
    
    One thing that just came to my mind is whether we should call the config flag "taskmanager.memory.preallocate" (default false).
    
    I think preallocate is somehow easier to get than lazy allocate...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1433#discussion_r46559041
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -1617,7 +1598,16 @@ object TaskManager {
           }
         }
     
    -    val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY
    +    val allocationMode = configuration.getString(
    +      ConfigConstants.TASK_MANAGER_MEMORY_ALLOCATION_KEY,
    +      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_ALLOCATION);
    +    val preAllocateMemory: Boolean = allocationMode.equals("eager")
    --- End diff --
    
    will do



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-162846304
  
    Any more comments?
    
    The log messages now look like this:
    ```
    11:28:58,212 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Limiting managed memory to 0.7 of the currently free heap space (295 MB), memory will be allocated lazily.
    ```
    or 
    ```
    11:30:09,859 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7 of the currently free heap space for managed heap memory (295 MB).
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-163177539
  
    Any objections against me merging this now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-162846459
  
    I also discovered some other stuff, for example, the off-heap memory usage is not printed by the `MemoryLogger` (max is fixing this) and some stats in the web fronted about TaskManagers are wrong. For example "Free Memory" is simply the size of the JVM Heap.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1433#discussion_r46557639
  
    --- Diff: docs/setup/cluster_setup.md ---
    @@ -236,34 +236,18 @@ bin/start-cluster.sh
     
     To stop Flink, there is also a `stop-cluster.sh` script.
     
    -
    -### Starting Flink in the streaming mode
    -
    -~~~bash
    -bin/start-cluster-streaming.sh
    -~~~
    -
    -The streaming mode changes the startup behavior of Flink: The system is not 
    -bringing up the managed memory services with preallocated memory at the beginning.
    -Flink streaming is not using the managed memory employed by the batch operators.
    -By not starting these services with preallocated memory, streaming jobs can benefit
    -from more heap space being available.
    -
    -Note that you can still start batch jobs in the streaming mode. The memory manager
    -will then allocate memory segments from the Java heap as needed.
    -
     ### Optional: Adding JobManager/TaskManager instances to a cluster
     
     You can add both TaskManager or JobManager instances to your running cluster with the `bin/taskmanager.sh` and `bin/jobmanager.sh` scripts.
     
     #### Adding a TaskManager
     <pre>
    -bin/taskmanager.sh (start [batch|streaming])|stop|stop-all)
    +bin/taskmanager.sh start|stop|stop-all
     </pre>
     
     #### Adding a JobManager
     <pre>
    -bin/jobmanager.sh (start cluster [batch|streaming])|stop|stop-all)
    +bin/jobmanager.sh start|stop|stop-all
    --- End diff --
    
    fixed it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1433#discussion_r46558633
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -1617,7 +1598,16 @@ object TaskManager {
           }
         }
     
    -    val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY
    +    val allocationMode = configuration.getString(
    +      ConfigConstants.TASK_MANAGER_MEMORY_ALLOCATION_KEY,
    +      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_ALLOCATION);
    +    val preAllocateMemory: Boolean = allocationMode.equals("eager")
    --- End diff --
    
    Could we verify whether we only have "eager" or "lazy" and throw an Exception otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1433#discussion_r46560453
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -1617,7 +1598,16 @@ object TaskManager {
           }
         }
     
    -    val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY
    +    val allocationMode = configuration.getString(
    +      ConfigConstants.TASK_MANAGER_MEMORY_ALLOCATION_KEY,
    +      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_ALLOCATION);
    +    val preAllocateMemory: Boolean = allocationMode.equals("eager")
    --- End diff --
    
    There is already a lazy allocation flag (boolean) Can you reuse that simply?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-163187358
  
    PR looks good. +1 for `taskmanager.memory.preallocate`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1433#discussion_r46558585
  
    --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
    @@ -63,7 +57,7 @@ if [[ $STARTSTOP == "start" ]]; then
             #
             TM_MAX_OFFHEAP_SIZE="8388607T"
     
    -        if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then
    +        if [[ "${FLINK_TM_MEM_ALLOC}" == "eager" ]] && useOffHeapMemory; then
    --- End diff --
    
    are you sure? I just preserved the previous behavior. (eager == batch_mode, lazy == streaming_mode)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1433#discussion_r46558419
  
    --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
    @@ -63,7 +57,7 @@ if [[ $STARTSTOP == "start" ]]; then
             #
             TM_MAX_OFFHEAP_SIZE="8388607T"
     
    -        if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then
    +        if [[ "${FLINK_TM_MEM_ALLOC}" == "eager" ]] && useOffHeapMemory; then
    --- End diff --
    
    For the off-heap memory case, the JVM heap memory should be reduced regardless of eager or lazy memory allocation. I think this can be changed to
    
    ```bash
    if useOffHeapMemory; then
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1433#discussion_r46563274
  
    --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
    @@ -63,7 +57,7 @@ if [[ $STARTSTOP == "start" ]]; then
             #
             TM_MAX_OFFHEAP_SIZE="8388607T"
     
    -        if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then
    +        if [[ "${FLINK_TM_MEM_ALLOC}" == "eager" ]] && useOffHeapMemory; then
    --- End diff --
    
    You might want to allocate lazily in batch jobs with off-heap memory. In the current state of the pull request, we could run into out of memory errors then. On the other hand, I see that we don't want to subtract heap memory in streaming with lazy memory allocation (because we don't allocate managed memory for streaming jobs for now).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-162469441
  
    Did the changes to boolean option and merging of log messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-163192234
  
    +1 from my side, after the switch to "preallocate"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-163264394
  
    Manually merged


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-162334401
  
    Tested the change on a cluster with a job reading some 100GBs of data ...
    looks good ..
    
    ```
    19:07:19,537 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7 of the currently free heap space for Flink managed heap memory (9001 MB).
    19:07:19,538 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - MemoryManager will be initialized with lazy memory allocation.
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-163194389
  
    is `pre-allocate` also ok? btw, we have a wild mixture of option styles, there is camelCase, under_score, and with-hyphens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/1433


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3073] Replace Streaming Mode by Memory ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1433#issuecomment-163184597
  
    Can do, yes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---