You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2021/10/27 13:59:19 UTC

[spark] branch master updated: [SPARK-37071][CORE] Make OpenHashMap serialize without reference tracking

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3319361  [SPARK-37071][CORE] Make OpenHashMap serialize without reference tracking
3319361 is described below

commit 3319361ca67212d2ae373bb46c5b6f2d80d792a4
Author: Emil Ejbyfeldt <ee...@liveintent.com>
AuthorDate: Wed Oct 27 08:58:37 2021 -0500

    [SPARK-37071][CORE] Make OpenHashMap serialize without reference tracking
    
    ### What changes were proposed in this pull request?
    Change the anonymous functions in OpenHashMap to member methods. This avoid having a member which captures the OpenHashMap object in its closure. This fixes so that OpenHashMap instances can be serialized with Kryo with reference tracking turned off.
    
    I am not sure why the original implementation had the anonymous function members in the first place. But if it was implemented that way for performance reason another possible fix is just to mark the `grow` and `move` members as transient.
    
    ### Why are the changes needed?
    User might want to turn off referenceTracking in kryo since it has performance benefits, but currently this will unnecessary and unexpectedly prevent them from using some features of spark that uses OpenHashMap internally.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests and a new test in the `KryoSerializerSuite`.
    
    Closes #34351 from eejbyfeldt/SPARK-37071-make-open-hash-map-serialize-without-reference-tracking.
    
    Authored-by: Emil Ejbyfeldt <ee...@liveintent.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../scala/org/apache/spark/util/collection/OpenHashMap.scala  |  9 ++-------
 .../spark/util/collection/PrimitiveKeyOpenHashMap.scala       |  9 ++-------
 .../org/apache/spark/serializer/KryoSerializerSuite.scala     | 11 +++++++++++
 .../util/collection/GraphXPrimitiveKeyOpenHashMap.scala       |  9 ++-------
 4 files changed, 17 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index 1200ac0..79e1a35 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -149,17 +149,12 @@ class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
     }
   }
 
-  // The following member variables are declared as protected instead of private for the
-  // specialization to work (specialized class extends the non-specialized one and needs access
-  // to the "private" variables).
-  // They also should have been val's. We use var's because there is a Scala compiler bug that
-  // would throw illegal access error at runtime if they are declared as val's.
-  protected var grow = (newCapacity: Int) => {
+  private def grow(newCapacity: Int): Unit = {
     _oldValues = _values
     _values = new Array[V](newCapacity)
   }
 
-  protected var move = (oldPos: Int, newPos: Int) => {
+  private def move(oldPos: Int, newPos: Int): Unit = {
     _values(newPos) = _oldValues(oldPos)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index 7a50d85..69665aa 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -117,17 +117,12 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
     }
   }
 
-  // The following member variables are declared as protected instead of private for the
-  // specialization to work (specialized class extends the unspecialized one and needs access
-  // to the "private" variables).
-  // They also should have been val's. We use var's because there is a Scala compiler bug that
-  // would throw illegal access error at runtime if they are declared as val's.
-  protected var grow = (newCapacity: Int) => {
+  private def grow(newCapacity: Int): Unit = {
     _oldValues = _values
     _values = new Array[V](newCapacity)
   }
 
-  protected var move = (oldPos: Int, newPos: Int) => {
+  private def move(oldPos: Int, newPos: Int): Unit = {
     _values(newPos) = _oldValues(oldPos)
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 229ef69..dd2340a 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.scheduler.HighlyCompressedMapStatus
 import org.apache.spark.serializer.KryoTest._
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.collection.OpenHashMap
 
 class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
@@ -526,6 +527,16 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     val actual: RoaringBitmap = ser.deserialize(ser.serialize(expected))
     assert(actual === expected)
   }
+
+  test("SPARK-37071: OpenHashMap serialize with reference tracking turned off") {
+    val conf = new SparkConf(false)
+    conf.set(KRYO_REFERENCE_TRACKING, false)
+
+    val ser = new KryoSerializer(conf).newInstance()
+
+    val set = new OpenHashMap[Double, Double](10)
+    ser.serialize(set)
+  }
 }
 
 class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
index e3b2836..247d77d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
@@ -137,17 +137,12 @@ class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
     }
   }
 
-  // The following member variables are declared as protected instead of private for the
-  // specialization to work (specialized class extends the unspecialized one and needs access
-  // to the "private" variables).
-  // They also should have been val's. We use var's because there is a Scala compiler bug that
-  // would throw illegal access error at runtime if they are declared as val's.
-  protected var grow = (newCapacity: Int) => {
+  private def grow(newCapacity: Int): Unit = {
     _oldValues = _values
     _values = new Array[V](newCapacity)
   }
 
-  protected var move = (oldPos: Int, newPos: Int) => {
+  private def move(oldPos: Int, newPos: Int): Unit = {
     _values(newPos) = _oldValues(oldPos)
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org