You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chen Zhang (Jira)" <ji...@apache.org> on 2020/07/07 10:13:00 UTC

[jira] [Comment Edited] (SPARK-31635) Spark SQL Sort fails when sorting big data points

    [ https://issues.apache.org/jira/browse/SPARK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17147653#comment-17147653 ] 

Chen Zhang edited comment on SPARK-31635 at 7/7/20, 10:12 AM:
--------------------------------------------------------------

In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_

The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + global bucket Sorting, and the required number of data is returned after the global sorting result is obtained.All major computation are performed in the executor process.

The execution logic of _RDD.takeOrdered()_ is to compute TOPK in each RDD partition in the executor process(by QuickSelect), and then return each TOPK result to the driver process for merging(by PriorityQueue).

To get the same result, it is obvious that the second method based on PriorityQueue has better performance.

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a configurable option to decide whether the TOPK data merge process occurs in the driver process or the executor process. If it occurs in the driver process, it can reduce the time for waiting for computation. If it occurs in the executor process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0 || partitions.length == 0) {
      Array.empty
    } else {
      if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) {
        val mapRDDs = mapPartitions { items =>
          // Priority keeps the largest elements, so let's reverse the ordering.
          val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
          queue ++= collectionUtils.takeOrdered(items, num)(ord)
          Iterator.single(queue)
        }
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      } else {
        mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.repartition(1).mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.collect()
      }
    }
  }
{code}


was (Author: chen zhang):
In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_

The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + global bucket Sorting, and the required number of data is returned after the global sorting result is obtained.All major computation are performed in the executor process.

The execution logic of _RDD.takeOrdered()_ is to compute TOPK(by PriorityQueue) in each RDD partition in the executor process, and then return each TOPK result to the driver process for merging.

To get the same result, it is obvious that the second method based on PriorityQueue has better performance.

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a configurable option to decide whether the TOPK data merge process occurs in the driver process or the executor process. If it occurs in the driver process, it can reduce the time for waiting for computation. If it occurs in the executor process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0 || partitions.length == 0) {
      Array.empty
    } else {
      if (conf.getBoolean("spark.rdd.take.ordered.driver.merge", true)) {
        val mapRDDs = mapPartitions { items =>
          // Priority keeps the largest elements, so let's reverse the ordering.
          val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
          queue ++= collectionUtils.takeOrdered(items, num)(ord)
          Iterator.single(queue)
        }
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      } else {
        mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.repartition(1).mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.collect()
      }
    }
  }
{code}

> Spark SQL Sort fails when sorting big data points
> -------------------------------------------------
>
>                 Key: SPARK-31635
>                 URL: https://issues.apache.org/jira/browse/SPARK-31635
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.2
>            Reporter: George George
>            Priority: Major
>
>  Please have a look at the example below: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => Nested(a,Seq.fill[Point](250000)(Point(1,2)))), 100)
> test.toDF().as[Nested].sort("a").take(1)
> {code}
>  *Sorting* big data objects using *Spark Dataframe* is failing with following exception: 
> {code:java}
> 2020-05-04 08:01:00 ERROR TaskSetManager:70 - Total size of serialized results of 14 tasks (107.8 MB) is bigger than spark.driver.maxResultSize (100.0 MB)
> [Stage 0:======>                                                 (12 + 3) / 100]org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 13 tasks (100.1 MB) is bigger than spark.driver.maxResu
> {code}
> However using the *RDD API* is working and no exception is thrown: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => Nested(a,Seq.fill[Point](250000)(Point(1,2)))), 100)
> test.sortBy(_.a).take(1)
> {code}
> For both code snippets we started the spark shell with exactly the same arguments:
> {code:java}
> spark-shell --driver-memory 6G --conf "spark.driver.maxResultSize=100MB"
> {code}
> Even if we increase the spark.driver.maxResultSize, the executors still get killed for our use case. The interesting thing is that when using the RDD API directly the problem is not there. *Looks like there is a bug in dataframe sort because is shuffling too much data to the driver?* 
> Note: this is a small example and I reduced the spark.driver.maxResultSize to a smaller size, but in our application I've tried setting it to 8GB but as mentioned above the job was killed. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org