You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Herman van Hovell (JIRA)" <ji...@apache.org> on 2016/11/25 16:10:59 UTC

[jira] [Comment Edited] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks

    [ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15696134#comment-15696134 ] 

Herman van Hovell edited comment on SPARK-17788 at 11/25/16 4:10 PM:
---------------------------------------------------------------------

Spark makes a sketch of your data as soon when you want to order the entire dataset. Based on that sketch Spark tries to create equally sized partitions. As [~holdenk] said, your problem is caused by skew (a lot of rows with the same key), and none of the current partitioning schemes can help you with this. On the short run, you could follow her suggestion and add noise to the order (this only works for global ordering and not for joins/aggregation with skewed values). On the long run, there is an ongoing effort to reduce skew for joining, see SPARK-9862 for more information.

I have creates the follow little spark program to illustrate how range partitioning works:
{noformat}
import org.apache.spark.sql.Row

// Set the partitions and parallelism to relatively low value so we can read the results.
spark.conf.set("spark.default.parallelism", "20")
spark.conf.set("spark.sql.shuffle.partitions", "20")

// Create a skewed data frame.
val df = spark
  .range(10000000)
  .select(
    $"id",
    (rand(34) * when($"id" % 10 <= 7, lit(1.0)).otherwise(lit(10.0))).as("value"))

// Make a summary per partition. The partition intervals should not overlap and the number of
// elements in a partition should roughly be the same for all partitions.
case class PartitionSummary(count: Long, min: Double, max: Double, range: Double)
val res = df.orderBy($"value").mapPartitions { iterator =>
  val (count, min, max) = iterator.foldLeft((0L, Double.PositiveInfinity, Double.NegativeInfinity)) {
    case ((count, min, max), Row(_, value: Double)) =>
      (count + 1L, Math.min(min, value), Math.max(max, value))
  }
  Iterator.single(PartitionSummary(count, min, max, max - min))
}

// Get results and make them look nice
res.orderBy($"min")
  .select($"count", $"min".cast("decimal(5,3)"), $"max".cast("decimal(5,3)"), $"range".cast("decimal(5,3)"))
  .show(30)
{noformat}

This yields the following results (notice how the partition range varies and the row count is relatively similar):
{noformat}
+------+-----+------+-----+                                                     
| count|  min|   max|range|
+------+-----+------+-----+
|484005|0.000| 0.059|0.059|
|426212|0.059| 0.111|0.052|
|381796|0.111| 0.157|0.047|
|519954|0.157| 0.221|0.063|
|496842|0.221| 0.281|0.061|
|539082|0.281| 0.347|0.066|
|516798|0.347| 0.410|0.063|
|558487|0.410| 0.478|0.068|
|419825|0.478| 0.529|0.051|
|402257|0.529| 0.578|0.049|
|557225|0.578| 0.646|0.068|
|518626|0.646| 0.710|0.063|
|611478|0.710| 0.784|0.075|
|544556|0.784| 0.851|0.066|
|454356|0.851| 0.906|0.055|
|450535|0.906| 0.961|0.055|
|575996|0.961| 2.290|1.329|
|525915|2.290| 4.920|2.630|
|518757|4.920| 7.510|2.590|
|497298|7.510|10.000|2.490|
+------+-----+------+-----+
{noformat}


was (Author: hvanhovell):
Spark makes a sketch of your data as soon when you want to order the entire dataset. Based on that sketch Spark tries to create equally sized partitions. As [~holdenk]] said, your problem is caused by skew (a lot of rows with the same key), and none of the current partitioning schemes can help you with this. On the short run, you could follow her suggestion and add noise to the order (this only works for global ordering and not for joins/aggregation with skewed values). On the long run, there is an ongoing effort to reduce skew for joining, see SPARK-9862 for more information.

I have creates the follow little spark program to illustrate how range partitioning works:
{noformat}
import org.apache.spark.sql.Row

// Set the partitions and parallelism to relatively low value so we can read the results.
spark.conf.set("spark.default.parallelism", "20")
spark.conf.set("spark.sql.shuffle.partitions", "20")

// Create a skewed data frame.
val df = spark
  .range(10000000)
  .select(
    $"id",
    (rand(34) * when($"id" % 10 <= 7, lit(1.0)).otherwise(lit(10.0))).as("value"))

// Make a summary per partition. The partition intervals should not overlap and the number of
// elements in a partition should roughly be the same for all partitions.
case class PartitionSummary(count: Long, min: Double, max: Double, range: Double)
val res = df.orderBy($"value").mapPartitions { iterator =>
  val (count, min, max) = iterator.foldLeft((0L, Double.PositiveInfinity, Double.NegativeInfinity)) {
    case ((count, min, max), Row(_, value: Double)) =>
      (count + 1L, Math.min(min, value), Math.max(max, value))
  }
  Iterator.single(PartitionSummary(count, min, max, max - min))
}

// Get results and make them look nice
res.orderBy($"min")
  .select($"count", $"min".cast("decimal(5,3)"), $"max".cast("decimal(5,3)"), $"range".cast("decimal(5,3)"))
  .show(30)
{noformat}

This yields the following results (notice how the partition range varies and the row count is relatively similar):
{noformat}
+------+-----+------+-----+                                                     
| count|  min|   max|range|
+------+-----+------+-----+
|484005|0.000| 0.059|0.059|
|426212|0.059| 0.111|0.052|
|381796|0.111| 0.157|0.047|
|519954|0.157| 0.221|0.063|
|496842|0.221| 0.281|0.061|
|539082|0.281| 0.347|0.066|
|516798|0.347| 0.410|0.063|
|558487|0.410| 0.478|0.068|
|419825|0.478| 0.529|0.051|
|402257|0.529| 0.578|0.049|
|557225|0.578| 0.646|0.068|
|518626|0.646| 0.710|0.063|
|611478|0.710| 0.784|0.075|
|544556|0.784| 0.851|0.066|
|454356|0.851| 0.906|0.055|
|450535|0.906| 0.961|0.055|
|575996|0.961| 2.290|1.329|
|525915|2.290| 4.920|2.630|
|518757|4.920| 7.510|2.590|
|497298|7.510|10.000|2.490|
+------+-----+------+-----+
{noformat}

> RangePartitioner results in few very large tasks and many small to empty tasks 
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-17788
>                 URL: https://issues.apache.org/jira/browse/SPARK-17788
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 2.0.0
>         Environment: Ubuntu 14.04 64bit
> Java 1.8.0_101
>            Reporter: Babak Alipour
>
> Greetings everyone,
> I was trying to read a single field of a Hive table stored as Parquet in Spark (~140GB for the entire table, this single field is a Double, ~1.4B records) and look at the sorted output using the following:
> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") 
> ​But this simple line of code gives:
> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes
> Same error for:
> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
> and:
> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
> After doing some searching, the issue seems to lie in the RangePartitioner trying to create equal ranges. [1]
> [1] https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html 
>  The Double values I'm trying to sort are mostly in the range [0,1] (~70% of the data which roughly equates 1 billion records), other numbers in the dataset are as high as 2000. With the RangePartitioner trying to create equal ranges, some tasks are becoming almost empty while others are extremely large, due to the heavily skewed distribution. 
> This is either a bug in Apache Spark or a major limitation of the framework. I hope one of the devs can help solve this issue.
> P.S. Email thread on Spark user mailing list:
> http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org