You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/12/29 02:12:52 UTC

[GitHub] [spark] sumeetgajjar opened a new pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

sumeetgajjar opened a new pull request #35047:
URL: https://github.com/apache/spark/pull/35047


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR aims at improving performance for Hash joins with many duplicate keys. 
   
   A HashedRelation uses a map underneath to store rows against a corresponding key. A LongToUnsafeRowMap is used by LongHashedRelation and a BytesToBytesMap is used by UnsafeHashedRelation.
   We propose to reorder the underlying map thereby placing all the rows for a given key adjacent in the memory to improve the spatial locality while iterating over them in the stream side of the join.
   
   This is achieved in the following steps:
   - creating another copy of the underlying map
   - for all keys in the existing map
     - get the corresponding rows 
     - insert all the rows for the given key at once in the new map
   - use the new map for look-ups
   
   This optimization can be enabled by specifying `spark.sql.hashedRelationReorderFactor=<value>`.
   Once the condition `number of rows >= number of unique keys * above value` is satisfied for the underlying map, the optimization will kick in.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   There is no order maintained when the rows are added to the underlying map, thus for a given key, the corresponding rows are typically non-adjacent in memory, resulting in a poor spatial locality. Placing the rows for adjacent in memory yields a performance boost thereby reducing execution time.
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   - Modified existing unit tests to run against the suggested improvement.
   - Added a couple of cases to test the scenarios when the improvement throws an exception due to insufficient memory.
   - Added a micro-benchmark that clearly indicates performance improvements when there are duplicate keys.
   - Ran the four example queries mentioned in the JIRA in spark-sql as a final check for performance improvement.
   
   ### Credits
   This work is based on the initial idea proposed by @bersprockets.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #35047:
URL: https://github.com/apache/spark/pull/35047#issuecomment-1006094278


   Hi @HyukjinKwon @cloud-fan can you please review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] singhpk234 commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r776221930



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       What I meant here is if we know before hand we don't have this much memory available we should try to short circuit this re-write by not attempting to do it ? we have this `getTotalMemoryConsumption` from earlier map, and we can calculate available memory to us during start, if we know we can't have / support 2X , maybe we will save time avoiding the extra work if the exception happens.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r796302266



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
       .checkValue(_ >= 0, "The value must be non-negative.")
       .createWithDefault(8)
 
+  val HASHED_RELATION_REORDER_FACTOR = buildConf("spark.sql.hashedRelationReorderFactor")
+    .doc("The HashedRelation will be reordered if the number of unique keys times this factor is " +
+      "less than equal to the total number of rows in the HashedRelation. " +
+      "The reordering places all rows with the same key adjacent to each other to improve " +
+      "spatial locality. This provides a performance boost while iterating over the rows for a " +
+      "given key due to increased cache hits")
+    .version("3.3.0")
+    .doubleConf
+    .checkValue(_ > 1, "The value must be greater than 1.")
+    .createOptional

Review comment:
       Apologies for the delayed response, I was stuck with some work stuff followed by a sick week due to covid.
   
   > @sumeetgajjar - could you help elaborate more why a global default value is sufficient per my question above?
   
   @c21 My rationale behind suggesting a global value of 4 was based on the experiment that I ran. I ran a synthetic workload with the HashOptimization enabled no matter the duplication factor of the keys. I gradually iterated over the duplication factor from 1 to 20, I noticed the optimization to be beneficial right after the duplication factor crossed a value of 4. Thus based on the experiment I conducted locally, I suggested a value of 4.
   
   > If the probe side has many looks up of keys with a lot of values, then we can see the improvement. But if the probe side does not look up much for these keys, then we probably cannot see the benefit. 
   
   I agree, the synthetic workload I was running queried the probe side such that the majority of the keys had multiple values. 
   
   Anyways, due to concerns over the added memory pressure introduced by this Optimization and the feedback received on the config being difficult to tweak, I've decided to close the PR. In case I find a better solution, I'll reopen the PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779842230



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       > I am also not sure of the performance penalty when doing shuffled hash join with large hash table in memory. We need to probe each key in the hash table to rebuild the table, and we kind of waste of the time of 1st build of table.
   
   I totally agree regarding wasting time on 1st build of the table. However, if we consider the amount of time taken to re-build the table, it is a minuscule amount when compared to the overall execution time of the query.
   It can be clearly seen in the above example.
   Also, in the above case, I was running the spark application in local mode. If we run the application on Yarn or K8s, the overall execution time will increase due to added network cost and the table rebuild time would further become a smaller fraction of the overall time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar edited a comment on pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar edited a comment on pull request #35047:
URL: https://github.com/apache/spark/pull/35047#issuecomment-1006094278


   Hi @HyukjinKwon @cloud-fan @dongjoon-hyun can you please review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779813987



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -476,23 +494,60 @@ private[joins] object UnsafeHashedRelation {
       numFields = row.numFields()
       val key = keyGenerator(row)
       if (!key.anyNull || allowsNullKey) {
-        val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
-        if (!(ignoresDuplicatedKey && loc.isDefined)) {
-          val success = loc.append(
-            key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
-            row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
-          if (!success) {
-            binaryMap.free()
-            throw QueryExecutionErrors.cannotAcquireMemoryToBuildUnsafeHashedRelationError()
-          }
-        }
+        mapAppendHelper(key, row, binaryMap)
       } else if (isNullAware) {
         binaryMap.free()
         return HashedRelationWithAllNullKeys
       }
     }
 
-    new UnsafeHashedRelation(key.size, numFields, binaryMap)
+    val relation = new UnsafeHashedRelation(key.size, numFields, binaryMap)
+    val reorderMap = reorderFactor.exists(_ * binaryMap.numKeys() <= binaryMap.numValues())
+    if (reorderMap) {
+      // Reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering BytesToBytesMap, uniqueKeys: ${binaryMap.numKeys()}, " +
+        s"totalNumValues: ${binaryMap.numValues()}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in BytesToBytesMap.allocate:
+      // release of the partially allocated memory is already taken care of through
+      // MemoryConsumer.allocateArray method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: mapAppendHelper invokes
+      // BytesToBytesMap.free method when a row cannot be appended to the map, thereby cleaning the
+      // partially allocated memory.
+      try {
+        val compactMap = new BytesToBytesMap(
+          taskMemoryManager,
+          // Only 70% of the slots can be used before growing, more capacity help to reduce
+          // collision
+          (binaryMap.numKeys() * 1.5 + 1).toInt,
+          pageSizeBytes)
+        // relation.keys() returns all keys and not just distinct keys thus distinct operation is
+        // applied to find unique keys
+        relation.keys().map(_.copy()).toSeq.distinct.foreach { key =>
+          relation.get(key).foreach { row =>
+            val unsafeRow = row.asInstanceOf[UnsafeRow]
+            val unsafeKey = keyGenerator(unsafeRow)
+            mapAppendHelper(unsafeKey, unsafeRow, compactMap)
+          }
+        }
+        relation.close()
+        logInfo("BytesToBytesMap reordered")
+        new UnsafeHashedRelation(key.size, numFields, compactMap)
+      } catch {
+        case e: SparkOutOfMemoryError =>
+          logWarning("Reordering BytesToBytesMap failed, " +

Review comment:
       Good catch, will modify the warning message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35047:
URL: https://github.com/apache/spark/pull/35047#issuecomment-1002826626


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779433582



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       It's a valid concern that this puts more memory pressure on the driver. Is it possible to improve the relation-building logic and make it co-locate the values of the same key? Then we don't need to rewrite the relation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bersprockets commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
bersprockets commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779885175



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
       .checkValue(_ >= 0, "The value must be non-negative.")
       .createWithDefault(8)
 
+  val HASHED_RELATION_REORDER_FACTOR = buildConf("spark.sql.hashedRelationReorderFactor")
+    .doc("The HashedRelation will be reordered if the number of unique keys times this factor is " +
+      "less than equal to the total number of rows in the HashedRelation. " +
+      "The reordering places all rows with the same key adjacent to each other to improve " +
+      "spatial locality. This provides a performance boost while iterating over the rows for a " +
+      "given key due to increased cache hits")
+    .version("3.3.0")
+    .doubleConf
+    .checkValue(_ > 1, "The value must be greater than 1.")
+    .createOptional

Review comment:
       Maybe a default hash reorder factor of 4, but a separate config for on/off (off would mean the reorder factor gets passed to the various hash relation factories as None, regardless of the setting of spark.sql.hashedRelationReorderFactor).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779935710



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
       .checkValue(_ >= 0, "The value must be non-negative.")
       .createWithDefault(8)
 
+  val HASHED_RELATION_REORDER_FACTOR = buildConf("spark.sql.hashedRelationReorderFactor")
+    .doc("The HashedRelation will be reordered if the number of unique keys times this factor is " +
+      "less than equal to the total number of rows in the HashedRelation. " +
+      "The reordering places all rows with the same key adjacent to each other to improve " +
+      "spatial locality. This provides a performance boost while iterating over the rows for a " +
+      "given key due to increased cache hits")
+    .version("3.3.0")
+    .doubleConf
+    .checkValue(_ > 1, "The value must be greater than 1.")
+    .createOptional

Review comment:
       > I feel the improvement also depends on access pattern from probe/stream side. If the probe side has many looks up of keys with a lot of values, then we can see the improvement. But if the probe side does not look up much for these keys, then we probably cannot see the benefit. I kind of feel that this config is not so easy to use in practice.
   
   > No, a global default value of 4, should suffice here.
   
   @sumeetgajjar - could you help elaborate more why a global default value is sufficient per my question above?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779786446



##########
File path: project/SparkBuild.scala
##########
@@ -1141,7 +1141,8 @@ object TestSettings {
     (Test / javaOptions) += "-ea",
     (Test / javaOptions) ++= {
       val metaspaceSize = sys.env.get("METASPACE_SIZE").getOrElse("1300m")
-      val extraTestJavaArgs = Array("-XX:+IgnoreUnrecognizedVMOptions",

Review comment:
       The actual change is to expose an ENV variable to set the heap size. If set this heap size will be using as `-Xmx` while invoking `runMain` from sbt. 
   The default 4g headspace was not sufficient when I was running the newly added benchmark for 10 billion rows (driver OOM'd), thus the change.
   
   While making the change I noticed `extraTestJavaArgs` `Array` was first converted to a string and immediately at the next line it was again split on spaces and converted to `Seq`.
   Thus I refactored the code a bit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779818221



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
       .checkValue(_ >= 0, "The value must be non-negative.")
       .createWithDefault(8)
 
+  val HASHED_RELATION_REORDER_FACTOR = buildConf("spark.sql.hashedRelationReorderFactor")
+    .doc("The HashedRelation will be reordered if the number of unique keys times this factor is " +
+      "less than equal to the total number of rows in the HashedRelation. " +
+      "The reordering places all rows with the same key adjacent to each other to improve " +
+      "spatial locality. This provides a performance boost while iterating over the rows for a " +
+      "given key due to increased cache hits")
+    .version("3.3.0")
+    .doubleConf
+    .checkValue(_ > 1, "The value must be greater than 1.")
+    .createOptional

Review comment:
       Yeah, I mean do we expect users to tune the config per each Spark app/query? I feel the improvement also depends on access pattern from probe/stream side. If the probe side has many looks up of keys with a lot of values, then we can see the improvement. But if the probe side does not look up much for these keys, then we probably cannot see the benefit. I kind of feel that this config is not so easy to use in practice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #35047:
URL: https://github.com/apache/spark/pull/35047#issuecomment-1007198645


   Since this introduces overhead (rebuild hash relation, more memory), I think we need to carefully make sure the benefit is larger than the overhead. Asking users to tune the config is really not a good way to roll out this optimization.
   
   Some random ideas:
   1. look at the ndv of join keys. If there are very few duplicated keys, don't do this optimization
   2. doing it at the executor side, in a dynamic way. If we detect that a key is consistently looked up and has many values, compact the values of this key and put it to a new fast map. We can even set an upper bound of the number of keys to optimize, to avoid taking too much memory. We don't need to rewrite the entire hash relation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #35047:
URL: https://github.com/apache/spark/pull/35047#issuecomment-1007198645


   Since this introduces overhead (rebuild hash relation, more memory), I think we need to carefully make sure the benefit is larger than the overhead. Asking users to tune the config is really not a good way to roll out this optimization.
   
   Some random ideas:
   1. look at the ndv of join keys. If there are very few duplicated keys, don't do this optimization
   2. doing it at the executor side, in a dynamic way. If we detect that a key is consistently looked up and has many values, compact the values of this key and put it to a new fast map. We can even set an upper bound of the number of keys to optimize, to avoid taking too much memory. We don't need to rewrite the entire hash relation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] singhpk234 commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r776221930



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       What I meant here is if we know before hand we don't have this much memory available we should try to short circuit this re-write by not attempting to do do ? we have this `getTotalMemoryConsumption` from earlier map, and we can calculate available memory to us during start, if we know we can't have / support 2X , maybe we will save time avoiding the extra work if the exception happens.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779430658



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))

Review comment:
       nvm, it already does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779783880



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
       .checkValue(_ >= 0, "The value must be non-negative.")
       .createWithDefault(8)
 
+  val HASHED_RELATION_REORDER_FACTOR = buildConf("spark.sql.hashedRelationReorderFactor")
+    .doc("The HashedRelation will be reordered if the number of unique keys times this factor is " +
+      "less than equal to the total number of rows in the HashedRelation. " +
+      "The reordering places all rows with the same key adjacent to each other to improve " +
+      "spatial locality. This provides a performance boost while iterating over the rows for a " +
+      "given key due to increased cache hits")
+    .version("3.3.0")
+    .doubleConf
+    .checkValue(_ > 1, "The value must be greater than 1.")
+    .createOptional

Review comment:
       curious of what would be a good default value for this? Are we expecting users to tune this config accordingly for each query? If user needs to tune this config for each query, then I feel this feature is less useful, because user can always choose to rewrite query to sort on join keys before join.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       > It's a valid concern that this puts more memory pressure on the driver. Is it possible to improve the relation-building logic and make it co-locate the values of the same key? Then we don't need to rewrite the relation.
   
   +1 on this. Beside memory concern, I am also not sure of the performance penalty when doing shuffled hash join with large hash table in memory. We need to probe each key in the hash table to rebuild the table, and we kind of waste of the time of 1st build of table.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -476,23 +494,60 @@ private[joins] object UnsafeHashedRelation {
       numFields = row.numFields()
       val key = keyGenerator(row)
       if (!key.anyNull || allowsNullKey) {
-        val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
-        if (!(ignoresDuplicatedKey && loc.isDefined)) {
-          val success = loc.append(
-            key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
-            row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
-          if (!success) {
-            binaryMap.free()
-            throw QueryExecutionErrors.cannotAcquireMemoryToBuildUnsafeHashedRelationError()
-          }
-        }
+        mapAppendHelper(key, row, binaryMap)
       } else if (isNullAware) {
         binaryMap.free()
         return HashedRelationWithAllNullKeys
       }
     }
 
-    new UnsafeHashedRelation(key.size, numFields, binaryMap)
+    val relation = new UnsafeHashedRelation(key.size, numFields, binaryMap)
+    val reorderMap = reorderFactor.exists(_ * binaryMap.numKeys() <= binaryMap.numValues())
+    if (reorderMap) {
+      // Reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering BytesToBytesMap, uniqueKeys: ${binaryMap.numKeys()}, " +
+        s"totalNumValues: ${binaryMap.numValues()}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in BytesToBytesMap.allocate:
+      // release of the partially allocated memory is already taken care of through
+      // MemoryConsumer.allocateArray method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: mapAppendHelper invokes
+      // BytesToBytesMap.free method when a row cannot be appended to the map, thereby cleaning the
+      // partially allocated memory.
+      try {
+        val compactMap = new BytesToBytesMap(
+          taskMemoryManager,
+          // Only 70% of the slots can be used before growing, more capacity help to reduce
+          // collision
+          (binaryMap.numKeys() * 1.5 + 1).toInt,
+          pageSizeBytes)
+        // relation.keys() returns all keys and not just distinct keys thus distinct operation is
+        // applied to find unique keys
+        relation.keys().map(_.copy()).toSeq.distinct.foreach { key =>
+          relation.get(key).foreach { row =>
+            val unsafeRow = row.asInstanceOf[UnsafeRow]
+            val unsafeKey = keyGenerator(unsafeRow)
+            mapAppendHelper(unsafeKey, unsafeRow, compactMap)
+          }
+        }
+        relation.close()
+        logInfo("BytesToBytesMap reordered")
+        new UnsafeHashedRelation(key.size, numFields, compactMap)
+      } catch {
+        case e: SparkOutOfMemoryError =>
+          logWarning("Reordering BytesToBytesMap failed, " +

Review comment:
       HashedRelation building can happen either on driver side (for broadcast join), or executor side (for shuffled hash join), so `try increasing the driver memory to mitigate it` is not accurate suggestion for users. We probably want to suggest users to disable reordering because it causes OOM here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar edited a comment on pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar edited a comment on pull request #35047:
URL: https://github.com/apache/spark/pull/35047#issuecomment-1026521639


   Apologies for the delayed response, I was stuck with some work stuff followed by a sick week due to covid.
   
   > Some random ideas:
   
   Thanks for the suggestions, appreciate it.
   
   > Since this introduces overhead (rebuild hash relation, more memory), I think we need to carefully make sure the benefit is larger than the overhead. Asking users to tune the config is really not a good way to roll out this optimization.
   
   Agreed, in that case, I'll close this PR for the time being. In case I find a better solution, I'll reopen the PR.
   
   Thank you @cloud-fan @c21 @bersprockets @singhpk234 for your comments on the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r776450291



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       That sounds like a good idea, however, `TaskMemoryManager` does not expose API to fetch available execution memory. There does exist `MemoryManager.getExecutionMemoryUsageForTask` but unfortunately `MemoryManager` is not accessible at the place of building the Map.
   https://github.com/apache/spark/blob/4c806728eb955c07f7e095a6ff085a50d2eef806/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L216
   
   Furthermore, re-building the map takes a fraction amount of the time of the entire query execution.
   I observed the following times while running a test with this feature:
   
   <table>
      <body>
          <tr>
            <td>Stream side</td>
            <td>300M rows</td>
         </tr>
         <tr>
            <td>Build side</td>
            <td>90M rows</td>
         </tr>
         <tr>
            <td>Rebuilding the map</td>
            <td>4 seconds (diff from the logs)</td>
         </tr>
         <tr>
            <td>Total query execution time with optimization enabled</td>
            <td>3.9 minutes</td>
         </tr>
         <tr>
            <td>Total query execution time with optimization disabled</td>
            <td>9.2 minutes</td>
         </tr>
      </tbody>
   </table>
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #35047:
URL: https://github.com/apache/spark/pull/35047#issuecomment-1026521639


   Apologies for the delayed response, I was stuck with some work stuff followed by a sick week due to covid.
   
   > Some random ideas:
   
   Thanks for the suggestions, appreciate it.
   
   > Since this introduces overhead (rebuild hash relation, more memory), I think we need to carefully make sure the benefit is larger than the overhead. Asking users to tune the config is really not a good way to roll out this optimization.
   
   Agreed, in that case, I'll close this PR for the time being. In case I find a better solution, I'll reopen the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar closed pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar closed pull request #35047:
URL: https://github.com/apache/spark/pull/35047


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779429831



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))

Review comment:
       shall we let this `LongToUnsafeRowMap` allocate all the pages ahead, so that we can fail earlier if memory is not enough?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779842035



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       > Is it possible to improve the relation-building logic and make it co-locate the values of the same key?
   
   Since there is no ordering guarantee with `Iterator[InternalRow]`, I don't believe there is a way to co-locate the values of the same key when building the relation, but let me think about it again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] singhpk234 commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r776171704



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       [question] IIUC, at this point of time we will have both the old map as well as the new compact map, in memory and we would be holding 2X memory (at this time) which we were holding before (as we just had 1 map). This itself can cause us OOM, do we need some heuristic here ? or perhap's delete / free (not sure if at present we have this functionality) enteries once they have been entered in the  compactMap ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r776197746



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       In case an OOM-related exception is thrown while appending to the map, we invoke `maybeCompactMap.foreach(_.free())` in the catch clause which releases the memory of the `compactMap`.
   
   https://github.com/apache/spark/pull/35047/files#diff-127291a0287f790755be5473765ea03eb65f8b58b9ec0760955f124e21e3452fR1171




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779813781



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
       .checkValue(_ >= 0, "The value must be non-negative.")
       .createWithDefault(8)
 
+  val HASHED_RELATION_REORDER_FACTOR = buildConf("spark.sql.hashedRelationReorderFactor")
+    .doc("The HashedRelation will be reordered if the number of unique keys times this factor is " +
+      "less than equal to the total number of rows in the HashedRelation. " +
+      "The reordering places all rows with the same key adjacent to each other to improve " +
+      "spatial locality. This provides a performance boost while iterating over the rows for a " +
+      "given key due to increased cache hits")
+    .version("3.3.0")
+    .doubleConf
+    .checkValue(_ > 1, "The value must be greater than 1.")
+    .createOptional

Review comment:
       As per the SQL benchmarks (not the micro-benchmark) that I ran, 4 is a good default value for this config.
   A value of 4 reduces the query processing time to half.
   
   This was not set at query level but at the spark application level. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779922869



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
       .checkValue(_ >= 0, "The value must be non-negative.")
       .createWithDefault(8)
 
+  val HASHED_RELATION_REORDER_FACTOR = buildConf("spark.sql.hashedRelationReorderFactor")
+    .doc("The HashedRelation will be reordered if the number of unique keys times this factor is " +
+      "less than equal to the total number of rows in the HashedRelation. " +
+      "The reordering places all rows with the same key adjacent to each other to improve " +
+      "spatial locality. This provides a performance boost while iterating over the rows for a " +
+      "given key due to increased cache hits")
+    .version("3.3.0")
+    .doubleConf
+    .checkValue(_ > 1, "The value must be greater than 1.")
+    .createOptional

Review comment:
       > Maybe a default hash reorder factor of 4, but a separate config for on/off (off would mean the reorder factor gets passed to the various hash relation factories as None, regardless of the setting of spark.sql.hashedRelationReorderFactor).
   
   I like the idea, I'll add it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779862873



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
       .checkValue(_ >= 0, "The value must be non-negative.")
       .createWithDefault(8)
 
+  val HASHED_RELATION_REORDER_FACTOR = buildConf("spark.sql.hashedRelationReorderFactor")
+    .doc("The HashedRelation will be reordered if the number of unique keys times this factor is " +
+      "less than equal to the total number of rows in the HashedRelation. " +
+      "The reordering places all rows with the same key adjacent to each other to improve " +
+      "spatial locality. This provides a performance boost while iterating over the rows for a " +
+      "given key due to increased cache hits")
+    .version("3.3.0")
+    .doubleConf
+    .checkValue(_ > 1, "The value must be greater than 1.")
+    .createOptional

Review comment:
       > do we expect users to tune the config per each Spark app/query?
   No, a global default value of 4, should suffice here.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
       .checkValue(_ >= 0, "The value must be non-negative.")
       .createWithDefault(8)
 
+  val HASHED_RELATION_REORDER_FACTOR = buildConf("spark.sql.hashedRelationReorderFactor")
+    .doc("The HashedRelation will be reordered if the number of unique keys times this factor is " +
+      "less than equal to the total number of rows in the HashedRelation. " +
+      "The reordering places all rows with the same key adjacent to each other to improve " +
+      "spatial locality. This provides a performance boost while iterating over the rows for a " +
+      "given key due to increased cache hits")
+    .version("3.3.0")
+    .doubleConf
+    .checkValue(_ > 1, "The value must be greater than 1.")
+    .createOptional

Review comment:
       > do we expect users to tune the config per each Spark app/query?
   
   No, a global default value of 4, should suffice here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779421886



##########
File path: project/SparkBuild.scala
##########
@@ -1141,7 +1141,8 @@ object TestSettings {
     (Test / javaOptions) += "-ea",
     (Test / javaOptions) ++= {
       val metaspaceSize = sys.env.get("METASPACE_SIZE").getOrElse("1300m")
-      val extraTestJavaArgs = Array("-XX:+IgnoreUnrecognizedVMOptions",

Review comment:
       what's the actual change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org