You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:44 UTC

[27/50] git commit: Refactor SamplingSizeTracker into SizeTrackingAppendOnlyMap

Refactor SamplingSizeTracker into SizeTrackingAppendOnlyMap


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/daa7792a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/daa7792a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/daa7792a

Branch: refs/heads/master
Commit: daa7792ad654e24012439db79c5a7f4abf149dc1
Parents: 347fafe
Author: Aaron Davidson <aa...@databricks.com>
Authored: Mon Dec 30 23:07:29 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon Dec 30 23:39:02 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/util/SamplingSizeTracker.scala |  83 -------------
 .../collection/SizeTrackingAppendOnlyMap.scala  |  71 +++++++++--
 .../spark/util/SamplingSizeTrackerSuite.scala   | 120 -------------------
 .../util/SizeTrackingAppendOnlyMapSuite.scala   | 120 +++++++++++++++++++
 4 files changed, 184 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/daa7792a/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala b/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala
deleted file mode 100644
index 3eb8066..0000000
--- a/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import org.apache.spark.util.SamplingSizeTracker.Sample
-
-/**
- * Estimates the size of an object as it grows, in bytes.
- * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
- * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
- *
- * Users should call updateMade() every time their object is updated with new data, or 
- * flushSamples() if there is a non-linear change in object size (otherwise linear is assumed).
- * Not threadsafe.
- */
-private[spark] class SamplingSizeTracker(obj: AnyRef) {
-  /**
-   * Controls the base of the exponential which governs the rate of sampling.
-   * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
-   */
-  private val SAMPLE_GROWTH_RATE = 1.1
-
-  private var lastLastSample: Sample = _
-  private var lastSample: Sample = _
-
-  private var numUpdates: Long = _
-  private var nextSampleNum: Long = _
-
-  flushSamples()
-
-  /** Called after a non-linear change in the tracked object. Takes a new sample. */
-  def flushSamples() {
-    numUpdates = 0
-    nextSampleNum = 1
-    // Throw out both prior samples to avoid overestimating delta.
-    lastSample = Sample(SizeEstimator.estimate(obj), 0)
-    lastLastSample = lastSample
-  }
-
-  /** To be called after an update to the tracked object. Amortized O(1) time. */
-  def updateMade() {
-    numUpdates += 1
-    if (nextSampleNum == numUpdates) {
-      lastLastSample = lastSample
-      lastSample = Sample(SizeEstimator.estimate(obj), numUpdates)
-      nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
-    }
-  }
-
-  /** Estimates the current size of the tracked object. O(1) time. */
-  def estimateSize(): Long = {
-    val interpolatedDelta =
-      if (lastLastSample != null && lastLastSample != lastSample) {
-        (lastSample.size - lastLastSample.size).toDouble /
-          (lastSample.numUpdates - lastLastSample.numUpdates)
-      } else if (lastSample.numUpdates > 0) {
-        lastSample.size.toDouble / lastSample.numUpdates
-      } else {
-        0
-      }
-    val extrapolatedDelta = math.max(0, interpolatedDelta * (numUpdates - lastSample.numUpdates))
-    (lastSample.size + extrapolatedDelta).toLong
-  }
-}
-
-object SamplingSizeTracker {
-  case class Sample(size: Long, numUpdates: Long)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/daa7792a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
index ea0f2fd..e8401ab 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
@@ -17,28 +17,85 @@
 
 package org.apache.spark.util.collection
 
-import org.apache.spark.util.SamplingSizeTracker
+import scala.collection.mutable.ArrayBuffer
 
-/** Append-only map that keeps track of its estimated size in bytes. */
+import org.apache.spark.util.SizeEstimator
+import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample
+
+/**
+ * Append-only map that keeps track of its estimated size in bytes.
+ * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
+ * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
+ */
 private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
 
-  private val sizeTracker = new SamplingSizeTracker(this)
+  /**
+   * Controls the base of the exponential which governs the rate of sampling.
+   * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
+   */
+  private val SAMPLE_GROWTH_RATE = 1.1
+
+  /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
+  private val samples = new ArrayBuffer[Sample]()
+
+  /** Total number of insertions and updates into the map since the last resetSamples(). */
+  private var numUpdates: Long = _
 
-  def estimateSize() = sizeTracker.estimateSize()
+  /** The value of 'numUpdates' at which we will take our next sample. */
+  private var nextSampleNum: Long = _
+
+  /** The average number of bytes per update between our last two samples. */
+  private var bytesPerUpdate: Double = _
+
+  resetSamples()
+
+  /** Called after the map grows in size, as this can be a dramatic change for small objects. */
+  def resetSamples() {
+    numUpdates = 1
+    nextSampleNum = 1
+    samples.clear()
+    takeSample()
+  }
 
   override def update(key: K, value: V): Unit = {
     super.update(key, value)
-    sizeTracker.updateMade()
+    numUpdates += 1
+    if (nextSampleNum == numUpdates) { takeSample() }
   }
 
   override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
     val newValue = super.changeValue(key, updateFunc)
-    sizeTracker.updateMade()
+    numUpdates += 1
+    if (nextSampleNum == numUpdates) { takeSample() }
     newValue
   }
 
+  /** Takes a new sample of the current map's size. */
+  def takeSample() {
+    samples += Sample(SizeEstimator.estimate(this), numUpdates)
+    // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
+    bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
+      case latest :: previous :: tail =>
+        (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
+      case _ =>
+        0
+    })
+    nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
+  }
+
   override protected def growTable() {
     super.growTable()
-    sizeTracker.flushSamples()
+    resetSamples()
+  }
+
+  /** Estimates the current size of the map in bytes. O(1) time. */
+  def estimateSize(): Long = {
+    assert(samples.nonEmpty)
+    val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
+    (samples.last.size + extrapolatedDelta).toLong
   }
 }
+
+object SizeTrackingAppendOnlyMap {
+  case class Sample(size: Long, numUpdates: Long)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/daa7792a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala
deleted file mode 100644
index 47e4723..0000000
--- a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.util.Random
-
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.util.SamplingSizeTrackerSuite.LargeDummyClass
-import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
-
-class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll {
-  val NORMAL_ERROR = 0.20
-  val HIGH_ERROR = 0.30
-
-  test("fixed size insertions") {
-    testWith[Int, Long](10000, i => (i, i.toLong))
-    testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
-    testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass()))
-  }
-
-  test("variable size insertions") {
-    val rand = new Random(123456789)
-    def randString(minLen: Int, maxLen: Int): String = {
-      "a" * (rand.nextInt(maxLen - minLen) + minLen)
-    }
-    testWith[Int, String](10000, i => (i, randString(0, 10)))
-    testWith[Int, String](10000, i => (i, randString(0, 100)))
-    testWith[Int, String](10000, i => (i, randString(90, 100)))
-  }
-
-  test("updates") {
-    val rand = new Random(123456789)
-    def randString(minLen: Int, maxLen: Int): String = {
-      "a" * (rand.nextInt(maxLen - minLen) + minLen)
-    }
-    testWith[String, Int](10000, i => (randString(0, 10000), i))
-  }
-
-  def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
-    val map = new SizeTrackingAppendOnlyMap[K, V]()
-    for (i <- 0 until numElements) {
-      val (k, v) = makeElement(i)
-      map(k) = v
-      expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
-    }
-  }
-
-  def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
-    val betterEstimatedSize = SizeEstimator.estimate(obj)
-    assert(betterEstimatedSize * (1 - error) < estimatedSize,
-      s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
-    assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
-      s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
-  }
-}
-
-object SamplingSizeTrackerSuite {
-  // Speed test, for reproducibility of results.
-  // These could be highly non-deterministic in general, however.
-  // Results:
-  // AppendOnlyMap:   30 ms
-  // SizeTracker:     45 ms
-  // SizeEstimator: 1500 ms
-  def main(args: Array[String]) {
-    val numElements = 100000
-
-    val baseTimes = for (i <- 0 until 3) yield time {
-      val map = new AppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-      }
-    }
-
-    val sampledTimes = for (i <- 0 until 3) yield time {
-      val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-        map.estimateSize()
-      }
-    }
-
-    val unsampledTimes = for (i <- 0 until 3) yield time {
-      val map = new AppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-        SizeEstimator.estimate(map)
-      }
-    }
-
-    println("Base: " + baseTimes)
-    println("SizeTracker (sampled): " + sampledTimes)
-    println("SizeEstimator (unsampled): " + unsampledTimes)
-  }
-
-  def time(f: => Unit): Long = {
-    val start = System.currentTimeMillis()
-    f
-    System.currentTimeMillis() - start
-  }
-
-  private class LargeDummyClass {
-    val arr = new Array[Int](100)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/daa7792a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
new file mode 100644
index 0000000..93f0c6a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.util.Random
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
+import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
+
+class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
+  val NORMAL_ERROR = 0.20
+  val HIGH_ERROR = 0.30
+
+  test("fixed size insertions") {
+    testWith[Int, Long](10000, i => (i, i.toLong))
+    testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
+    testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass()))
+  }
+
+  test("variable size insertions") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testWith[Int, String](10000, i => (i, randString(0, 10)))
+    testWith[Int, String](10000, i => (i, randString(0, 100)))
+    testWith[Int, String](10000, i => (i, randString(90, 100)))
+  }
+
+  test("updates") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testWith[String, Int](10000, i => (randString(0, 10000), i))
+  }
+
+  def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
+    val map = new SizeTrackingAppendOnlyMap[K, V]()
+    for (i <- 0 until numElements) {
+      val (k, v) = makeElement(i)
+      map(k) = v
+      expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+    }
+  }
+
+  def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
+    val betterEstimatedSize = SizeEstimator.estimate(obj)
+    assert(betterEstimatedSize * (1 - error) < estimatedSize,
+      s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
+    assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
+      s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
+  }
+}
+
+object SizeTrackingAppendOnlyMapSuite {
+  // Speed test, for reproducibility of results.
+  // These could be highly non-deterministic in general, however.
+  // Results:
+  // AppendOnlyMap:   31 ms
+  // SizeTracker:     54 ms
+  // SizeEstimator: 1500 ms
+  def main(args: Array[String]) {
+    val numElements = 100000
+
+    val baseTimes = for (i <- 0 until 10) yield time {
+      val map = new AppendOnlyMap[Int, LargeDummyClass]()
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass()
+      }
+    }
+
+    val sampledTimes = for (i <- 0 until 10) yield time {
+      val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass()
+        map.estimateSize()
+      }
+    }
+
+    val unsampledTimes = for (i <- 0 until 3) yield time {
+      val map = new AppendOnlyMap[Int, LargeDummyClass]()
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass()
+        SizeEstimator.estimate(map)
+      }
+    }
+
+    println("Base: " + baseTimes)
+    println("SizeTracker (sampled): " + sampledTimes)
+    println("SizeEstimator (unsampled): " + unsampledTimes)
+  }
+
+  def time(f: => Unit): Long = {
+    val start = System.currentTimeMillis()
+    f
+    System.currentTimeMillis() - start
+  }
+
+  private class LargeDummyClass {
+    val arr = new Array[Int](100)
+  }
+}