You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Kyle Winkelman (JIRA)" <ji...@apache.org> on 2018/07/26 15:35:00 UTC

[jira] [Commented] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

    [ https://issues.apache.org/jira/browse/BEAM-4783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16558437#comment-16558437 ] 

Kyle Winkelman commented on BEAM-4783:
--------------------------------------

After looking into the issue further I don't believe the issue is related to Dynamic Allocation at all. I believe all BoundedSources should be broken up into as many blocks as are required to have them all be the same bundleSize. 

The way the code is currently written it appears as though all BoundedSources will be just broken into n evenly sized blocks where n is the defaultParallelism. In this way one 100GB file is broken up very differently than eight 25GB files will be (8 times the blocks). The user shouldn't have to break their one large file up into smaller pieces to get blocks that are small enough to handle.

If they were always broken up by the bundleSize the above two cases would be broken up almost identically. This approach is also how spark with an hdfs input would work; breaking the file up on the default HDFS block size.

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> ------------------------------------------------------------
>
>                 Key: BEAM-4783
>                 URL: https://issues.apache.org/jira/browse/BEAM-4783
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>    Affects Versions: 2.5.0
>            Reporter: Kyle Winkelman
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>              Labels: newbie
>
> When the spark-runner is used along with the configuration spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It then falls back to the value calculated in this description:
>       // when running on YARN/SparkDeploy it's the result of max(totalCores, 2).
>       // when running on Mesos it's 8.
>       // when running local it's the total number of cores (local = 1, local[N] = N,
>       // local[*] = estimation of the machine's cores).
>       // ** the configuration "spark.default.parallelism" takes precedence over all of the above **
> So in most cases this default is quite small. This is an issue when using a very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)