You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/05/03 00:08:15 UTC

spark git commit: [SPARK-6030] [CORE] Using simulated field layout method to compute class shellSize

Repository: spark
Updated Branches:
  refs/heads/master da303526e -> bfcd528d6


[SPARK-6030] [CORE] Using simulated field layout method to compute class shellSize

SizeEstimator gives wrong result for Integer on 64bit JVM with UseCompressedOops on, this pr fixes that. For more details, please refer [SPARK-6030](https://issues.apache.org/jira/browse/SPARK-6030)
sryza, I noticed there is a pr to expose SizeEstimator, maybe that should be waited by this pr get merged if we confirm this problem.
And shivaram would you mind to review this pr since you contribute related code. Also cc to srowen and mateiz

Author: Ye Xianjin <ad...@gmail.com>

Closes #4783 from advancedxy/SPARK-6030 and squashes the following commits:

c4dcb41 [Ye Xianjin] Add super.beforeEach in the beforeEach method to make the trait stackable.. Remove useless leading whitespace.
3f80640 [Ye Xianjin] The size of Integer class changes from 24 to 16 on a 64-bit JVM with -UseCompressedOops flag on after the fix. I don't how 100000 was originally calculated, It looks like 100000 is the magic number which makes sure spilling. Because of the size change, It fails because there is no spilling at all. Change the number to a slightly larger number fixes that.
e849d2d [Ye Xianjin] Merge two shellSize assignments into one. Add some explanation to alignSizeUp method.
85a0b51 [Ye Xianjin] Fix typos and update wording in comments. Using alignSizeUp to compute alignSize.
d27eb77 [Ye Xianjin] Add some detailed comments in the code. Add some test cases. It's very difficult to design test cases as the final object alignment will hide a lot of filed layout details if we just considering the whole size.
842aed1 [Ye Xianjin] primitiveSize(cls) can just return Int. Use a simplified class field layout method to calculate class instance size. Will add more documents and test cases. Add a new alignSizeUp function which uses bitwise operators to speedup.
62e8ab4 [Ye Xianjin] Don't alignSize for objects' shellSize, alignSize when added to state.size. Add some primitive wrapper objects size tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfcd528d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfcd528d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfcd528d

Branch: refs/heads/master
Commit: bfcd528d6f5a5ebe61e0fcca890143e9a3c7f7f9
Parents: da30352
Author: Ye Xianjin <ad...@gmail.com>
Authored: Sat May 2 23:08:09 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat May 2 23:08:09 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/util/SizeEstimator.scala   | 61 ++++++++++++++++----
 .../apache/spark/util/SizeEstimatorSuite.scala  | 47 ++++++++++++++-
 .../util/collection/ExternalSorterSuite.scala   | 10 ++--
 3 files changed, 100 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bfcd528d/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 4dd7ab9..d91c329 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -47,6 +47,11 @@ private[spark] object SizeEstimator extends Logging {
   private val FLOAT_SIZE   = 4
   private val DOUBLE_SIZE  = 8
 
+  // Fields can be primitive types, sizes are: 1, 2, 4, 8. Or fields can be pointers. The size of
+  // a pointer is 4 or 8 depending on the JVM (32-bit or 64-bit) and UseCompressedOops flag.
+  // The sizes should be in descending order, as we will use that information for fields placement.
+  private val fieldSizes = List(8, 4, 2, 1)
+
   // Alignment boundary for objects
   // TODO: Is this arch dependent ?
   private val ALIGN_SIZE = 8
@@ -171,7 +176,7 @@ private[spark] object SizeEstimator extends Logging {
       // general all ClassLoaders and Classes will be shared between objects anyway.
     } else {
       val classInfo = getClassInfo(cls)
-      state.size += classInfo.shellSize
+      state.size += alignSize(classInfo.shellSize)
       for (field <- classInfo.pointerFields) {
         state.enqueue(field.get(obj))
       }
@@ -237,8 +242,8 @@ private[spark] object SizeEstimator extends Logging {
     }
     size
   }
-  
-  private def primitiveSize(cls: Class[_]): Long = {
+
+  private def primitiveSize(cls: Class[_]): Int = {
     if (cls == classOf[Byte]) {
       BYTE_SIZE
     } else if (cls == classOf[Boolean]) {
@@ -274,21 +279,50 @@ private[spark] object SizeEstimator extends Logging {
     val parent = getClassInfo(cls.getSuperclass)
     var shellSize = parent.shellSize
     var pointerFields = parent.pointerFields
+    val sizeCount = Array.fill(fieldSizes.max + 1)(0)
 
+    // iterate through the fields of this class and gather information.
     for (field <- cls.getDeclaredFields) {
       if (!Modifier.isStatic(field.getModifiers)) {
         val fieldClass = field.getType
         if (fieldClass.isPrimitive) {
-          shellSize += primitiveSize(fieldClass)
+          sizeCount(primitiveSize(fieldClass)) += 1
         } else {
           field.setAccessible(true) // Enable future get()'s on this field
-          shellSize += pointerSize
+          sizeCount(pointerSize) += 1
           pointerFields = field :: pointerFields
         }
       }
     }
 
-    shellSize = alignSize(shellSize)
+    // Based on the simulated field layout code in Aleksey Shipilev's report:
+    // http://cr.openjdk.java.net/~shade/papers/2013-shipilev-fieldlayout-latest.pdf
+    // The code is in Figure 9.
+    // The simplified idea of field layout consists of 4 parts (see more details in the report):
+    //
+    // 1. field alignment: HotSpot lays out the fields aligned by their size.
+    // 2. object alignment: HotSpot rounds instance size up to 8 bytes
+    // 3. consistent fields layouts throughout the hierarchy: This means we should layout
+    // superclass first. And we can use superclass's shellSize as a starting point to layout the
+    // other fields in this class.
+    // 4. class alignment: HotSpot rounds field blocks up to to HeapOopSize not 4 bytes, confirmed
+    // with Aleksey. see https://bugs.openjdk.java.net/browse/CODETOOLS-7901322
+    //
+    // The real world field layout is much more complicated. There are three kinds of fields
+    // order in Java 8. And we don't consider the @contended annotation introduced by Java 8.
+    // see the HotSpot classloader code, layout_fields method for more details.
+    // hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/classfile/classFileParser.cpp
+    var alignedSize = shellSize
+    for (size <- fieldSizes if sizeCount(size) > 0) {
+      val count = sizeCount(size)
+      // If there are internal gaps, smaller field can fit in.
+      alignedSize = math.max(alignedSize, alignSizeUp(shellSize, size) + size * count)
+      shellSize += size * count
+    }
+
+    // Should choose a larger size to be new shellSize and clearly alignedSize >= shellSize, and
+    // round up the instance filed blocks
+    shellSize = alignSizeUp(alignedSize, pointerSize)
 
     // Create and cache a new ClassInfo
     val newInfo = new ClassInfo(shellSize, pointerFields)
@@ -296,8 +330,15 @@ private[spark] object SizeEstimator extends Logging {
     newInfo
   }
 
-  private def alignSize(size: Long): Long = {
-    val rem = size % ALIGN_SIZE
-    if (rem == 0) size else (size + ALIGN_SIZE - rem)
-  }
+  private def alignSize(size: Long): Long = alignSizeUp(size, ALIGN_SIZE)
+
+  /**
+   * Compute aligned size. The alignSize must be 2^n, otherwise the result will be wrong.
+   * When alignSize = 2^n, alignSize - 1 = 2^n - 1. The binary representation of (alignSize - 1)
+   * will only have n trailing 1s(0b00...001..1). ~(alignSize - 1) will be 0b11..110..0. Hence,
+   * (size + alignSize - 1) & ~(alignSize - 1) will set the last n bits to zeros, which leads to
+   * multiple of alignSize.
+   */
+  private def alignSizeUp(size: Long, alignSize: Int): Long =
+    (size + alignSize - 1) & ~(alignSize - 1)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfcd528d/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 28915bd..133a76f 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -36,6 +36,15 @@ class DummyClass4(val d: DummyClass3) {
   val x: Int = 0
 }
 
+// dummy class to show class field blocks alignment.
+class DummyClass5 extends DummyClass1 {
+  val x: Boolean = true
+}
+
+class DummyClass6 extends DummyClass5 {
+  val y: Boolean = true
+}
+
 object DummyString {
   def apply(str: String) : DummyString = new DummyString(str.toArray)
 }
@@ -50,6 +59,7 @@ class SizeEstimatorSuite
 
   override def beforeEach() {
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
+    super.beforeEach()
     System.setProperty("os.arch", "amd64")
     System.setProperty("spark.test.useCompressedOops", "true")
   }
@@ -62,6 +72,22 @@ class SizeEstimatorSuite
     assertResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
   }
 
+  test("primitive wrapper objects") {
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Boolean(true)))
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Byte("1")))
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Character('1')))
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Short("1")))
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Integer(1)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Long(1)))
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Float(1.0)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Double(1.0d)))
+  }
+
+  test("class field blocks rounding") {
+    assertResult(16)(SizeEstimator.estimate(new DummyClass5))
+    assertResult(24)(SizeEstimator.estimate(new DummyClass6))
+  }
+
   // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
   // (Sun vs IBM). Use a DummyString class to make tests deterministic.
   test("strings") {
@@ -102,18 +128,18 @@ class SizeEstimatorSuite
     val arr = new Array[Char](100000)
     assertResult(200016)(SizeEstimator.estimate(arr))
     assertResult(480032)(SizeEstimator.estimate(Array.fill(10000)(new DummyString(arr))))
-    
+
     val buf = new ArrayBuffer[DummyString]()
     for (i <- 0 until 5000) {
       buf.append(new DummyString(new Array[Char](10)))
     }
     assertResult(340016)(SizeEstimator.estimate(buf.toArray))
-    
+
     for (i <- 0 until 5000) {
       buf.append(new DummyString(arr))
     }
     assertResult(683912)(SizeEstimator.estimate(buf.toArray))
-    
+
     // If an array contains the *same* element many times, we should only count it once.
     val d1 = new DummyClass1
     // 10 pointers plus 8-byte object
@@ -155,5 +181,20 @@ class SizeEstimatorSuite
     assertResult(64)(SizeEstimator.estimate(DummyString("a")))
     assertResult(64)(SizeEstimator.estimate(DummyString("ab")))
     assertResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
+
+    // primitive wrapper classes
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Boolean(true)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Byte("1")))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Character('1')))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Short("1")))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Integer(1)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Long(1)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Float(1.0)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Double(1.0d)))
+  }
+
+  test("class field blocks rounding on 64-bit VM without useCompressedOops") {
+    assertResult(24)(SizeEstimator.estimate(new DummyClass5))
+    assertResult(32)(SizeEstimator.estimate(new DummyClass6))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfcd528d/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 20fd22b..7a98723 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -377,7 +377,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
     val sorter = new ExternalSorter[Int, Int, Int](
       None, Some(new HashPartitioner(3)), Some(ord), None)
     assertDidNotBypassMergeSort(sorter)
-    sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
+    sorter.insertAll((0 until 120000).iterator.map(i => (i, i)))
     assert(diskBlockManager.getAllFiles().length > 0)
     sorter.stop()
     assert(diskBlockManager.getAllBlocks().length === 0)
@@ -385,9 +385,9 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
     val sorter2 = new ExternalSorter[Int, Int, Int](
       None, Some(new HashPartitioner(3)), Some(ord), None)
     assertDidNotBypassMergeSort(sorter2)
-    sorter2.insertAll((0 until 100000).iterator.map(i => (i, i)))
+    sorter2.insertAll((0 until 120000).iterator.map(i => (i, i)))
     assert(diskBlockManager.getAllFiles().length > 0)
-    assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet)
+    assert(sorter2.iterator.toSet === (0 until 120000).map(i => (i, i)).toSet)
     sorter2.stop()
     assert(diskBlockManager.getAllBlocks().length === 0)
   }
@@ -428,8 +428,8 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
       None, Some(new HashPartitioner(3)), Some(ord), None)
     assertDidNotBypassMergeSort(sorter)
     intercept[SparkException] {
-      sorter.insertAll((0 until 100000).iterator.map(i => {
-        if (i == 99990) {
+      sorter.insertAll((0 until 120000).iterator.map(i => {
+        if (i == 119990) {
           throw new SparkException("Intentional failure")
         }
         (i, i)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org