You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Weichen Xu (JIRA)" <ji...@apache.org> on 2017/10/26 08:41:00 UTC

[jira] [Updated] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

     [ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Weichen Xu updated SPARK-22357:
-------------------------------
    Description: 
this is a bug in binaryFiles - even though we give it the partitions, binaryFiles ignores it.
This is a bug introduced in spark 2.1 from spark 2.0, in file PortableDataStream.scala the argument “minPartitions” is no longer used (with the push to master on 11/7/6):

{code}
/**
Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API
which is set through setMaxSplitSize
*/
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
    val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
    val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
    val defaultParallelism = sc.defaultParallelism
    val files = listStatus(context).asScala
    val totalBytes = files.filterNot(.isDirectory).map(.getLen + openCostInBytes).sum
    val bytesPerCore = totalBytes / defaultParallelism
    val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    super.setMaxSplitSize(maxSplitSize)
}
{code}

The code previously, in version 2.0, was:
{code}
def setMinPartitions(context: JobContext, minPartitions: Int) {
    val totalLen = listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
    val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
    super.setMaxSplitSize(maxSplitSize)
}
{code}

The new code is very smart, but it ignores what the user passes in and uses the data size, which is kind of a breaking change in some sense
In our specific case this was a problem, because we initially read in just the files names and only after that the dataframe becomes very large, when reading in the images themselves – and in this case the new code does not handle the partitioning very well.
I’m not sure if it can be easily fixed because I don’t understand the full context of the change in spark (but at the very least the unused parameter should be removed to avoid confusion).



  was:
this is a bug in binaryFiles - even though we give it the partitions, binaryFiles ignores it.
This is a bug introduced in spark 2.1 from spark 2.0, in file PortableDataStream.scala the argument “minPartitions” is no longer used (with the push to master on 11/7/6):

/**

Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API
which is set through setMaxSplitSize
*/
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val files = listStatus(context).asScala
val totalBytes = files.filterNot(.isDirectory).map(.getLen + openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
super.setMaxSplitSize(maxSplitSize)
}
The code previously, in version 2.0, was:

def setMinPartitions(context: JobContext, minPartitions: Int) {
val totalLen = listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
super.setMaxSplitSize(maxSplitSize)
}

The new code is very smart, but it ignores what the user passes in and uses the data size, which is kind of a breaking change in some sense
In our specific case this was a problem, because we initially read in just the files names and only after that the dataframe becomes very large, when reading in the images themselves – and in this case the new code does not handle the partitioning very well.
I’m not sure if it can be easily fixed because I don’t understand the full context of the change in spark (but at the very least the unused parameter should be removed to avoid confusion).




> SparkContext.binaryFiles ignore minPartitions parameter
> -------------------------------------------------------
>
>                 Key: SPARK-22357
>                 URL: https://issues.apache.org/jira/browse/SPARK-22357
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.2, 2.2.0
>            Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file PortableDataStream.scala the argument “minPartitions” is no longer used (with the push to master on 11/7/6):
> {code}
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
>     val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
>     val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
>     val defaultParallelism = sc.defaultParallelism
>     val files = listStatus(context).asScala
>     val totalBytes = files.filterNot(.isDirectory).map(.getLen + openCostInBytes).sum
>     val bytesPerCore = totalBytes / defaultParallelism
>     val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
>     super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The code previously, in version 2.0, was:
> {code}
> def setMinPartitions(context: JobContext, minPartitions: Int) {
>     val totalLen = listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
>     val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
>     super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The new code is very smart, but it ignores what the user passes in and uses the data size, which is kind of a breaking change in some sense
> In our specific case this was a problem, because we initially read in just the files names and only after that the dataframe becomes very large, when reading in the images themselves – and in this case the new code does not handle the partitioning very well.
> I’m not sure if it can be easily fixed because I don’t understand the full context of the change in spark (but at the very least the unused parameter should be removed to avoid confusion).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org