You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Ulanov, Alexander" <al...@hpe.com> on 2016/04/26 20:10:38 UTC

Number of partitions for binaryFiles

Dear Spark developers,

I have 100 binary files in local file system that I want to load into Spark RDD. I need the data from each file to be in a separate partition. However, I cannot make it happen:

scala> sc.binaryFiles("/data/subset").partitions.size
res5: Int = 66

The "minPartitions" parameter does not seems to help:
scala> sc.binaryFiles("/data/subset", minPartitions = 100).partitions.size
res8: Int = 66

At the same time, Spark produces the required number of partitions with sc.textFiles (though I cannot use it because my files are binary):
scala> sc.textFile("/data/subset").partitions.size
res9: Int = 100

Could you suggest how to force Spark to load binary files each in a separate partition?

Best regards, Alexander

RE: Number of partitions for binaryFiles

Posted by "Ulanov, Alexander" <al...@hpe.com>.
The issue is that the data was specifically prepared in such a way that each file is a single partition computationally and logically. It seems strange that one cannot override the default behavior. It might be too expensive to perform another round of re-partitioning within Spark because it will involve shuffling.


From: Ted Yu [mailto:yuzhihong@gmail.com]
Sent: Tuesday, April 26, 2016 2:44 PM
To: Ulanov, Alexander <al...@hpe.com>
Cc: dev@spark.apache.org
Subject: Re: Number of partitions for binaryFiles

From what I understand, Spark code was written this way because you don't end up with very small partitions.

In your case, look at the size of the cluster.
If 66 partitions can make good use of your cluster, it should be fine.

On Tue, Apr 26, 2016 at 2:27 PM, Ulanov, Alexander <al...@hpe.com>> wrote:
Hi Ted,

I have 36 files of size ~600KB and the rest 74 are about 400KB.

Is there a workaround rather than changing Sparks code?

Best regards, Alexander

From: Ted Yu [mailto:yuzhihong@gmail.com<ma...@gmail.com>]
Sent: Tuesday, April 26, 2016 1:22 PM
To: Ulanov, Alexander <al...@hpe.com>>
Cc: dev@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Number of partitions for binaryFiles

Here is the body of StreamFileInputFormat#setMinPartitions :

  def setMinPartitions(context: JobContext, minPartitions: Int) {
    val totalLen = listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
    val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
    super.setMaxSplitSize(maxSplitSize)

I guess what happened was that among the 100 files you had, there were ~60 files whose sizes were much bigger than the rest.
According to the way max split size is computed above, you ended up with fewer partitions.

I just performed a test using local directory where 3 files were significantly larger than the rest and reproduced what you observed.

Cheers

On Tue, Apr 26, 2016 at 11:10 AM, Ulanov, Alexander <al...@hpe.com>> wrote:
Dear Spark developers,

I have 100 binary files in local file system that I want to load into Spark RDD. I need the data from each file to be in a separate partition. However, I cannot make it happen:

scala> sc.binaryFiles("/data/subset").partitions.size
res5: Int = 66

The “minPartitions” parameter does not seems to help:
scala> sc.binaryFiles("/data/subset", minPartitions = 100).partitions.size
res8: Int = 66

At the same time, Spark produces the required number of partitions with sc.textFiles (though I cannot use it because my files are binary):
scala> sc.textFile("/data/subset").partitions.size
res9: Int = 100

Could you suggest how to force Spark to load binary files each in a separate partition?

Best regards, Alexander



Re: Number of partitions for binaryFiles

Posted by Ted Yu <yu...@gmail.com>.
>From what I understand, Spark code was written this way because you don't
end up with very small partitions.

In your case, look at the size of the cluster.
If 66 partitions can make good use of your cluster, it should be fine.

On Tue, Apr 26, 2016 at 2:27 PM, Ulanov, Alexander <alexander.ulanov@hpe.com
> wrote:

> Hi Ted,
>
>
>
> I have 36 files of size ~600KB and the rest 74 are about 400KB.
>
>
>
> Is there a workaround rather than changing Sparks code?
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Ted Yu [mailto:yuzhihong@gmail.com]
> *Sent:* Tuesday, April 26, 2016 1:22 PM
> *To:* Ulanov, Alexander <al...@hpe.com>
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Number of partitions for binaryFiles
>
>
>
> Here is the body of StreamFileInputFormat#setMinPartitions :
>
>
>
>   def setMinPartitions(context: JobContext, minPartitions: Int) {
>
>     val totalLen =
> listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
>
>     val maxSplitSize = math.ceil(totalLen / math.max(minPartitions,
> 1.0)).toLong
>
>     super.setMaxSplitSize(maxSplitSize)
>
>
>
> I guess what happened was that among the 100 files you had, there were ~60
> files whose sizes were much bigger than the rest.
>
> According to the way max split size is computed above, you ended up with
> fewer partitions.
>
>
>
> I just performed a test using local directory where 3 files were
> significantly larger than the rest and reproduced what you observed.
>
>
>
> Cheers
>
>
>
> On Tue, Apr 26, 2016 at 11:10 AM, Ulanov, Alexander <
> alexander.ulanov@hpe.com> wrote:
>
> Dear Spark developers,
>
>
>
> I have 100 binary files in local file system that I want to load into
> Spark RDD. I need the data from each file to be in a separate partition.
> However, I cannot make it happen:
>
>
>
> scala> sc.binaryFiles("/data/subset").partitions.size
>
> res5: Int = 66
>
>
>
> The “minPartitions” parameter does not seems to help:
>
> scala> sc.binaryFiles("/data/subset", minPartitions = 100).partitions.size
>
> res8: Int = 66
>
>
>
> At the same time, Spark produces the required number of partitions with
> sc.textFiles (though I cannot use it because my files are binary):
>
> scala> sc.textFile("/data/subset").partitions.size
>
> res9: Int = 100
>
>
>
> Could you suggest how to force Spark to load binary files each in a
> separate partition?
>
>
>
> Best regards, Alexander
>
>
>

RE: Number of partitions for binaryFiles

Posted by "Ulanov, Alexander" <al...@hpe.com>.
Hi Ted,

I have 36 files of size ~600KB and the rest 74 are about 400KB.

Is there a workaround rather than changing Sparks code?

Best regards, Alexander

From: Ted Yu [mailto:yuzhihong@gmail.com]
Sent: Tuesday, April 26, 2016 1:22 PM
To: Ulanov, Alexander <al...@hpe.com>
Cc: dev@spark.apache.org
Subject: Re: Number of partitions for binaryFiles

Here is the body of StreamFileInputFormat#setMinPartitions :

  def setMinPartitions(context: JobContext, minPartitions: Int) {
    val totalLen = listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
    val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
    super.setMaxSplitSize(maxSplitSize)

I guess what happened was that among the 100 files you had, there were ~60 files whose sizes were much bigger than the rest.
According to the way max split size is computed above, you ended up with fewer partitions.

I just performed a test using local directory where 3 files were significantly larger than the rest and reproduced what you observed.

Cheers

On Tue, Apr 26, 2016 at 11:10 AM, Ulanov, Alexander <al...@hpe.com>> wrote:
Dear Spark developers,

I have 100 binary files in local file system that I want to load into Spark RDD. I need the data from each file to be in a separate partition. However, I cannot make it happen:

scala> sc.binaryFiles("/data/subset").partitions.size
res5: Int = 66

The “minPartitions” parameter does not seems to help:
scala> sc.binaryFiles("/data/subset", minPartitions = 100).partitions.size
res8: Int = 66

At the same time, Spark produces the required number of partitions with sc.textFiles (though I cannot use it because my files are binary):
scala> sc.textFile("/data/subset").partitions.size
res9: Int = 100

Could you suggest how to force Spark to load binary files each in a separate partition?

Best regards, Alexander


Re: Number of partitions for binaryFiles

Posted by Ted Yu <yu...@gmail.com>.
Here is the body of StreamFileInputFormat#setMinPartitions :

  def setMinPartitions(context: JobContext, minPartitions: Int) {
    val totalLen =
listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
    val maxSplitSize = math.ceil(totalLen / math.max(minPartitions,
1.0)).toLong
    super.setMaxSplitSize(maxSplitSize)

I guess what happened was that among the 100 files you had, there were ~60
files whose sizes were much bigger than the rest.
According to the way max split size is computed above, you ended up with
fewer partitions.

I just performed a test using local directory where 3 files were
significantly larger than the rest and reproduced what you observed.

Cheers

On Tue, Apr 26, 2016 at 11:10 AM, Ulanov, Alexander <
alexander.ulanov@hpe.com> wrote:

> Dear Spark developers,
>
>
>
> I have 100 binary files in local file system that I want to load into
> Spark RDD. I need the data from each file to be in a separate partition.
> However, I cannot make it happen:
>
>
>
> scala> sc.binaryFiles("/data/subset").partitions.size
>
> res5: Int = 66
>
>
>
> The “minPartitions” parameter does not seems to help:
>
> scala> sc.binaryFiles("/data/subset", minPartitions = 100).partitions.size
>
> res8: Int = 66
>
>
>
> At the same time, Spark produces the required number of partitions with
> sc.textFiles (though I cannot use it because my files are binary):
>
> scala> sc.textFile("/data/subset").partitions.size
>
> res9: Int = 100
>
>
>
> Could you suggest how to force Spark to load binary files each in a
> separate partition?
>
>
>
> Best regards, Alexander
>