You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/04 16:04:44 UTC

[GitHub] [spark] prakharjain09 commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

prakharjain09 commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r449784414



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##########
@@ -63,6 +63,8 @@ case class HashAggregateExec(
 
   require(HashAggregateExec.supportsAggregate(aggregateBufferAttributes))
 
+  override def needStopCheck: Boolean = skipPartialAggregate

Review comment:
       why do we need to add stopCheck to child operators after this change?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##########
@@ -877,44 +901,61 @@ case class HashAggregateExec(
       ("true", "true", "", "")
     }
 
-    val oomeClassName = classOf[SparkOutOfMemoryError].getName
+    val skipPartialAggregateThreshold = sqlContext.conf.skipPartialAggregateThreshold
+    val skipPartialAggRatio = sqlContext.conf.skipPartialAggregateRatio
 
+    val oomeClassName = classOf[SparkOutOfMemoryError].getName
+    val countTerm = ctx.addMutableState(CodeGenerator.JAVA_LONG, "count")
     val findOrInsertRegularHashMap: String =
       s"""
-         |// generate grouping key
-         |${unsafeRowKeyCode.code}
-         |int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
-         |if ($checkFallbackForBytesToBytesMap) {
-         |  // try to get the buffer from hash map
-         |  $unsafeRowBuffer =
-         |    $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, $unsafeRowKeyHash);
-         |}
-         |// Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
-         |// aggregation after processing all input rows.
-         |if ($unsafeRowBuffer == null) {
-         |  if ($sorterTerm == null) {
-         |    $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
-         |  } else {
-         |    $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter());
+         |if (!$avoidSpillInPartialAggregateTerm) {
+         |  // generate grouping key
+         |  ${unsafeRowKeyCode.code}
+         |  int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
+         |  if ($checkFallbackForBytesToBytesMap) {
+         |    // try to get the buffer from hash map
+         |    $unsafeRowBuffer =
+         |      $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, $unsafeRowKeyHash);
          |  }
-         |  $resetCounter
-         |  // the hash map had be spilled, it should have enough memory now,
-         |  // try to allocate buffer again.
-         |  $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow(
-         |    $unsafeRowKeys, $unsafeRowKeyHash);
-         |  if ($unsafeRowBuffer == null) {
-         |    // failed to allocate the first page
-         |    throw new $oomeClassName("No enough memory for aggregation");
+         |  // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
+         |  // aggregation after processing all input rows.
+         |  if ($unsafeRowBuffer == null && !$avoidSpillInPartialAggregateTerm) {

Review comment:
       is this check redundant? we are already checking for `!avoidSpillInPartialAggregateTerm` above?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org