You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/09/14 21:07:52 UTC

[spark] branch branch-3.0 updated: [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 990d49a  [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
990d49a is described below

commit 990d49a28d6b7d1bd011aa7f1666217bcc920a77
Author: Ankur Dave <an...@gmail.com>
AuthorDate: Mon Sep 14 13:58:15 2020 -0700

    [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
    
    ### What changes were proposed in this pull request?
    
    When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has ty [...]
    
    This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them.
    
    ### Why are the changes needed?
    
    Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage.
    
    For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.
    
    ```java
    public abstract class AbstractBytesToBytesMapSuite {
      // ...
      Test
      public void respectGrowthThresholdAtMaxCapacity() {
        TestMemoryManager memoryManager2 =
            new TestMemoryManager(
                new SparkConf()
                .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
                .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L)
                .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
                .set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
        TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0);
        final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
        final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes);
    
        try {
          // Insert keys into the map until it stops accepting new keys.
          for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
            if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i);
            final long[] value = new long[]{i};
            BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8);
            Assert.assertFalse(loc.isDefined());
            boolean success =
                loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8);
            if (!success) break;
          }
    
          // The map should grow to its max capacity.
          long capacity = map.getArray().size() / 2;
          Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);
    
          // The map should stop accepting new keys once it has reached its growth
          // threshold, which is half the max capacity.
          Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);
    
          map.free();
        } finally {
          map.free();
        }
      }
    }
    ```
    
    Closes #29744 from ankurdave/SPARK-32872.
    
    Authored-by: Ankur Dave <an...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 72550c3be7120fcf2844d6914e883f1bec30d93f)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java    | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 6e02888..4036856 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -738,12 +738,21 @@ public final class BytesToBytesMap extends MemoryConsumer {
         longArray.set(pos * 2 + 1, keyHashcode);
         isDefined = true;
 
-        // We use two array entries per key, so the array size is twice the capacity.
-        // We should compare the current capacity of the array, instead of its size.
-        if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) {
-          try {
-            growAndRehash();
-          } catch (SparkOutOfMemoryError oom) {
+        // If the map has reached its growth threshold, try to grow it.
+        if (numKeys >= growthThreshold) {
+          // We use two array entries per key, so the array size is twice the capacity.
+          // We should compare the current capacity of the array, instead of its size.
+          if (longArray.size() / 2 < MAX_CAPACITY) {
+            try {
+              growAndRehash();
+            } catch (SparkOutOfMemoryError oom) {
+              canGrowArray = false;
+            }
+          } else {
+            // The map is already at MAX_CAPACITY and cannot grow. Instead, we prevent it from
+            // accepting any more new elements to make sure we don't exceed the load factor. If we
+            // need to spill later, this allows UnsafeKVExternalSorter to reuse the array for
+            // sorting.
             canGrowArray = false;
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org