You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mxm <gi...@git.apache.org> on 2015/11/16 19:17:36 UTC

[GitHub] flink pull request: [FLINK-3020][streaming] set local default para...

GitHub user mxm opened a pull request:

    https://github.com/apache/flink/pull/1360

    [FLINK-3020][streaming] set local default parallelism to maxmium parallelism

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mxm/flink FLINK-3020

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1360.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1360
    
----
commit 0aeb52ced687912cb3e3c268d27486aa5be5936f
Author: Maximilian Michels <mx...@apache.org>
Date:   2015-11-16T18:04:31Z

    [FLINK-3020][streaming] set local default parallelism to maxmium parallelism

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-158049463
  
    Exactly. But that makes sense, right? If the users explicitly sets the number of task slots, we shouldn't change the number of task slots automatically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-158019230
  
    Yes, in such a case it would fail. Thinking about it, you're right that it would give a better user experience if the maximum degree of the job is taken instead of the default parallelism. Maybe we should change it then for the batch part as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1360


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-158024467
  
    Alright. I will push the original pull request version again which uses the max parallelism of all operators. Further, I will open a separate JIRA for the batch side change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-158009498
  
    True. That's how it is handled on the batch side. Not sure about this behavior though. If a user sets a default parallelism but uses operators with `parallelism > defaultParallelism` this would fail, right? The rational behind this is probably to maximize the parallelism for all operators and not have operators with exceptional high parallelism.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-158334929
  
    I think we agree that we want to set the number of task slots to the maximum parallelism instead of the default one. I'll merge this later on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-157766781
  
    I think the maximum parallelism of the job should be taken if the user has not specified a different parallelism than the default one (`-1`). I think that's also how the batch part does it (here the `LocalExecutor` has a field `taskManagerNumSlots` which can be set). Besides that, looks good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1360#discussion_r45327211
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java ---
    @@ -91,7 +91,11 @@ public JobExecutionResult execute(String jobName) throws Exception {
     		configuration.addAll(jobGraph.getJobConfiguration());
     
     		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
    -		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getParallelism());
    +
    +		int parallelism = getParallelism() == defaultLocalParallelism ?
    +				defaultLocalParallelism : jobGraph.getMaximumParallelism();
    --- End diff --
    
    `defaultLocalParallelism` seems to be the number of available processors. I think this is very unintuitive that the default parallelism is only taken if it equals the number of processors.
    
    This means in case of an operator with a higher dop than the #cores, the program will only fail if the user sets the default parallelism to #cores.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-158032582
  
    Just checked. The batch side always uses the maximum parallelism as the number of task slots (if they are not set explicitly). Till and me actually thought differently. So the proposed changes in this PR align with the batch behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1360#discussion_r45326391
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java ---
    @@ -91,7 +91,11 @@ public JobExecutionResult execute(String jobName) throws Exception {
     		configuration.addAll(jobGraph.getJobConfiguration());
     
     		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
    -		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getParallelism());
    +
    +		int parallelism = getParallelism() == defaultLocalParallelism ?
    +				defaultLocalParallelism : jobGraph.getMaximumParallelism();
    --- End diff --
    
    This currently means that in the default case, the slots do not respect the max parallel operator, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-158052735
  
    Yes I think so. I guess it's due to the difference that batch programs are executed using a `PlanExecutor` whereas streaming programs are directly executed by the `StreamExecutionEnvironment`. Consequently, there is no direct way of specifying the number of task slots for the used `LocalFlinkMiniCluster` other than via the parallelism of the job or the default parallelism.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-158022328
  
    +1 for taking the max parallelism of all operators


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-158047994
  
    But this is only true if the `taskManagerNumSlots` in `LocalExecutor` are left untouched. Currently, this is the case but this is not enforced.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3020][streaming] set number of task slo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1360#issuecomment-158020011
  
    I would also be in favor of changing the local execution to always use the maximum specified parallelism as the number of task slots. IMHO the current behavior is not intuitive. The default parallelism currently acts as a maximum parallelism in local execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---