You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/04 17:25:43 UTC

[GitHub] [spark] karuppayya commented on a change in pull request #28804: [SPARK-31973][SQL] Add ability to disable Sort,Spill in Partial aggregation

karuppayya commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r449790519



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##########
@@ -877,44 +901,61 @@ case class HashAggregateExec(
       ("true", "true", "", "")
     }
 
-    val oomeClassName = classOf[SparkOutOfMemoryError].getName
+    val skipPartialAggregateThreshold = sqlContext.conf.skipPartialAggregateThreshold
+    val skipPartialAggRatio = sqlContext.conf.skipPartialAggregateRatio
 
+    val oomeClassName = classOf[SparkOutOfMemoryError].getName
+    val countTerm = ctx.addMutableState(CodeGenerator.JAVA_LONG, "count")
     val findOrInsertRegularHashMap: String =
       s"""
-         |// generate grouping key
-         |${unsafeRowKeyCode.code}
-         |int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
-         |if ($checkFallbackForBytesToBytesMap) {
-         |  // try to get the buffer from hash map
-         |  $unsafeRowBuffer =
-         |    $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, $unsafeRowKeyHash);
-         |}
-         |// Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
-         |// aggregation after processing all input rows.
-         |if ($unsafeRowBuffer == null) {
-         |  if ($sorterTerm == null) {
-         |    $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
-         |  } else {
-         |    $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter());
+         |if (!$avoidSpillInPartialAggregateTerm) {
+         |  // generate grouping key
+         |  ${unsafeRowKeyCode.code}
+         |  int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
+         |  if ($checkFallbackForBytesToBytesMap) {
+         |    // try to get the buffer from hash map
+         |    $unsafeRowBuffer =
+         |      $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, $unsafeRowKeyHash);
          |  }
-         |  $resetCounter
-         |  // the hash map had be spilled, it should have enough memory now,
-         |  // try to allocate buffer again.
-         |  $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow(
-         |    $unsafeRowKeys, $unsafeRowKeyHash);
-         |  if ($unsafeRowBuffer == null) {
-         |    // failed to allocate the first page
-         |    throw new $oomeClassName("No enough memory for aggregation");
+         |  // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
+         |  // aggregation after processing all input rows.
+         |  if ($unsafeRowBuffer == null && !$avoidSpillInPartialAggregateTerm) {

Review comment:
       @prakharjain09 
   The first check will kick in once we have decided to avoid partail aggregation, in whih case we wont attempt to fetch from Map.
   Second check will kick only when the Map is exhausted completely, and this is when we decide whether we have to skip partil aggregation.
   I think both checks are required.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org