You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/12/29 06:32:57 UTC

[GitHub] [spark] singhpk234 commented on a change in pull request #35047: [SPARK-37175][SQL] Performance improvement to hash joins with many duplicate keys

singhpk234 commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r776171704



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: ${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action is required.
+      // 2. Failure occurs while adding rows to the map: the partially allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       [question] IIUC, at this point of time we will have both the old map as well as the new compact map, in memory and we would be holding 2X memory (at this time) which we were holding before (as we just had 1 map). This itself can cause us OOM, do we need some heuristic here ? or perhap's delete / free (not sure if at present we have this functionality) enteries once they have been entered in the  compactMap ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org