You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:14 UTC

[30/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
new file mode 100644
index 0000000..ae02226
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -0,0 +1,156 @@
+package org.apache.spark.ui.jobs
+
+import scala.Seq
+import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
+
+import org.apache.spark.{ExceptionFailure, SparkContext, Success, Utils}
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.TaskInfo
+import org.apache.spark.executor.TaskMetrics
+import collection.mutable
+
+/**
+ * Tracks task-level information to be displayed in the UI.
+ *
+ * All access to the data structures in this class must be synchronized on the
+ * class, since the UI thread and the DAGScheduler event loop may otherwise
+ * be reading/updating the internal data structures concurrently.
+ */
+private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
+  // How many stages to remember
+  val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
+  val DEFAULT_POOL_NAME = "default"
+
+  val stageToPool = new HashMap[Stage, String]()
+  val stageToDescription = new HashMap[Stage, String]()
+  val poolToActiveStages = new HashMap[String, HashSet[Stage]]()
+
+  val activeStages = HashSet[Stage]()
+  val completedStages = ListBuffer[Stage]()
+  val failedStages = ListBuffer[Stage]()
+
+  // Total metrics reflect metrics only for completed tasks
+  var totalTime = 0L
+  var totalShuffleRead = 0L
+  var totalShuffleWrite = 0L
+
+  val stageToTime = HashMap[Int, Long]()
+  val stageToShuffleRead = HashMap[Int, Long]()
+  val stageToShuffleWrite = HashMap[Int, Long]()
+  val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
+  val stageToTasksComplete = HashMap[Int, Int]()
+  val stageToTasksFailed = HashMap[Int, Int]()
+  val stageToTaskInfos =
+    HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+
+  override def onJobStart(jobStart: SparkListenerJobStart) {}
+
+  override def onStageCompleted(stageCompleted: StageCompleted) = synchronized {
+    val stage = stageCompleted.stageInfo.stage
+    poolToActiveStages(stageToPool(stage)) -= stage
+    activeStages -= stage
+    completedStages += stage
+    trimIfNecessary(completedStages)
+  }
+
+  /** If stages is too large, remove and garbage collect old stages */
+  def trimIfNecessary(stages: ListBuffer[Stage]) = synchronized {
+    if (stages.size > RETAINED_STAGES) {
+      val toRemove = RETAINED_STAGES / 10
+      stages.takeRight(toRemove).foreach( s => {
+        stageToTaskInfos.remove(s.id)
+        stageToTime.remove(s.id)
+        stageToShuffleRead.remove(s.id)
+        stageToShuffleWrite.remove(s.id)
+        stageToTasksActive.remove(s.id)
+        stageToTasksComplete.remove(s.id)
+        stageToTasksFailed.remove(s.id)
+        stageToPool.remove(s)
+        if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
+      })
+      stages.trimEnd(toRemove)
+    }
+  }
+
+  /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
+  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
+    val stage = stageSubmitted.stage
+    activeStages += stage
+
+    val poolName = Option(stageSubmitted.properties).map {
+      p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
+    }.getOrElse(DEFAULT_POOL_NAME)
+    stageToPool(stage) = poolName
+
+    val description = Option(stageSubmitted.properties).flatMap {
+      p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
+    }
+    description.map(d => stageToDescription(stage) = d)
+
+    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
+    stages += stage
+  }
+  
+  override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
+    val sid = taskStart.task.stageId
+    val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+    tasksActive += taskStart.taskInfo
+    val taskList = stageToTaskInfos.getOrElse(
+      sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+    taskList += ((taskStart.taskInfo, None, None))
+    stageToTaskInfos(sid) = taskList
+  }
+ 
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+    val sid = taskEnd.task.stageId
+    val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+    tasksActive -= taskEnd.taskInfo
+    val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+      taskEnd.reason match {
+        case e: ExceptionFailure =>
+          stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
+          (Some(e), e.metrics)
+        case _ =>
+          stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
+          (None, Option(taskEnd.taskMetrics))
+      }
+
+    stageToTime.getOrElseUpdate(sid, 0L)
+    val time = metrics.map(m => m.executorRunTime).getOrElse(0)
+    stageToTime(sid) += time
+    totalTime += time
+
+    stageToShuffleRead.getOrElseUpdate(sid, 0L)
+    val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
+      s.remoteBytesRead).getOrElse(0L)
+    stageToShuffleRead(sid) += shuffleRead
+    totalShuffleRead += shuffleRead
+
+    stageToShuffleWrite.getOrElseUpdate(sid, 0L)
+    val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
+      s.shuffleBytesWritten).getOrElse(0L)
+    stageToShuffleWrite(sid) += shuffleWrite
+    totalShuffleWrite += shuffleWrite
+
+    val taskList = stageToTaskInfos.getOrElse(
+      sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+    taskList -= ((taskEnd.taskInfo, None, None))
+    taskList += ((taskEnd.taskInfo, metrics, failureInfo))
+    stageToTaskInfos(sid) = taskList
+  }
+
+  override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
+    jobEnd match {
+      case end: SparkListenerJobEnd =>
+        end.jobResult match {
+          case JobFailed(ex, Some(stage)) =>
+            activeStages -= stage
+            poolToActiveStages(stageToPool(stage)) -= stage
+            failedStages += stage
+            trimIfNecessary(failedStages)
+          case _ =>
+        }
+      case _ =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
new file mode 100644
index 0000000..1bb7638
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import akka.util.Duration
+
+import java.text.SimpleDateFormat
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import scala.Seq
+import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
+
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.{ExceptionFailure, SparkContext, Success, Utils}
+import org.apache.spark.scheduler._
+import collection.mutable
+import org.apache.spark.scheduler.cluster.SchedulingMode
+import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+
+/** Web UI showing progress status of all jobs in the given SparkContext. */
+private[spark] class JobProgressUI(val sc: SparkContext) {
+  private var _listener: Option[JobProgressListener] = None
+  def listener = _listener.get
+  val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+
+  private val indexPage = new IndexPage(this)
+  private val stagePage = new StagePage(this)
+  private val poolPage = new PoolPage(this)
+
+  def start() {
+    _listener = Some(new JobProgressListener(sc))
+    sc.addSparkListener(listener)
+  }
+
+  def formatDuration(ms: Long) = Utils.msDurationToString(ms)
+
+  def getHandlers = Seq[(String, Handler)](
+    ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
+    ("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)),
+    ("/stages", (request: HttpServletRequest) => indexPage.render(request))
+  )
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
new file mode 100644
index 0000000..ce92b69
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -0,0 +1,32 @@
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{NodeSeq, Node}
+import scala.collection.mutable.HashSet
+
+import org.apache.spark.scheduler.Stage
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui.Page._
+
+/** Page showing specific pool details */
+private[spark] class PoolPage(parent: JobProgressUI) {
+  def listener = parent.listener
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    listener.synchronized {
+      val poolName = request.getParameter("poolname")
+      val poolToActiveStages = listener.poolToActiveStages
+      val activeStages = poolToActiveStages.get(poolName).toSeq.flatten
+      val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
+
+      val pool = listener.sc.getPoolForName(poolName).get
+      val poolTable = new PoolTable(Seq(pool), listener)
+
+      val content = <h4>Summary </h4> ++ poolTable.toNodeSeq() ++
+                    <h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq()
+
+      headerSparkPage(content, parent.sc, "Fair Scheduler Pool: " + poolName, Stages)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
new file mode 100644
index 0000000..f31465e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -0,0 +1,55 @@
+package org.apache.spark.ui.jobs
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.xml.Node
+
+import org.apache.spark.scheduler.Stage
+import org.apache.spark.scheduler.cluster.Schedulable
+
+/** Table showing list of pools */
+private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
+
+  var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
+
+  def toNodeSeq(): Seq[Node] = {
+    listener.synchronized {
+      poolTable(poolRow, pools)
+    }
+  }
+
+  private def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
+    rows: Seq[Schedulable]
+    ): Seq[Node] = {
+    <table class="table table-bordered table-striped table-condensed sortable table-fixed">
+      <thead>
+        <th>Pool Name</th>
+        <th>Minimum Share</th>
+        <th>Pool Weight</th>
+        <th>Active Stages</th>
+        <th>Running Tasks</th>
+        <th>SchedulingMode</th>
+      </thead>
+      <tbody>
+        {rows.map(r => makeRow(r, poolToActiveStages))}
+      </tbody>
+    </table>
+  }
+
+  private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]])
+    : Seq[Node] = {
+    val activeStages = poolToActiveStages.get(p.name) match {
+      case Some(stages) => stages.size
+      case None => 0
+    }
+    <tr>
+      <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
+      <td>{p.minShare}</td>
+      <td>{p.weight}</td>
+      <td>{activeStages}</td>
+      <td>{p.runningTasks}</td>
+      <td>{p.schedulingMode}</td>
+    </tr>
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
new file mode 100644
index 0000000..2fe85bc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import java.util.Date
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui.Page._
+import org.apache.spark.util.Distribution
+import org.apache.spark.{ExceptionFailure, Utils}
+import org.apache.spark.scheduler.cluster.TaskInfo
+import org.apache.spark.executor.TaskMetrics
+
+/** Page showing statistics and task list for a given stage */
+private[spark] class StagePage(parent: JobProgressUI) {
+  def listener = parent.listener
+  val dateFmt = parent.dateFmt
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    listener.synchronized {
+      val stageId = request.getParameter("id").toInt
+      val now = System.currentTimeMillis()
+
+      if (!listener.stageToTaskInfos.contains(stageId)) {
+        val content =
+          <div>
+            <h4>Summary Metrics</h4> No tasks have started yet
+            <h4>Tasks</h4> No tasks have started yet
+          </div>
+        return headerSparkPage(content, parent.sc, "Details for Stage %s".format(stageId), Stages)
+      }
+
+      val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime)
+
+      val numCompleted = tasks.count(_._1.finished)
+      val shuffleReadBytes = listener.stageToShuffleRead.getOrElse(stageId, 0L)
+      val hasShuffleRead = shuffleReadBytes > 0
+      val shuffleWriteBytes = listener.stageToShuffleWrite.getOrElse(stageId, 0L)
+      val hasShuffleWrite = shuffleWriteBytes > 0
+
+      var activeTime = 0L
+      listener.stageToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
+
+      val summary =
+        <div>
+          <ul class="unstyled">
+            <li>
+              <strong>CPU time: </strong>
+              {parent.formatDuration(listener.stageToTime.getOrElse(stageId, 0L) + activeTime)}
+            </li>
+            {if (hasShuffleRead)
+              <li>
+                <strong>Shuffle read: </strong>
+                {Utils.bytesToString(shuffleReadBytes)}
+              </li>
+            }
+            {if (hasShuffleWrite)
+              <li>
+                <strong>Shuffle write: </strong>
+                {Utils.bytesToString(shuffleWriteBytes)}
+              </li>
+            }
+          </ul>
+        </div>
+
+      val taskHeaders: Seq[String] =
+        Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++
+        Seq("GC Time") ++
+        {if (hasShuffleRead) Seq("Shuffle Read")  else Nil} ++
+        {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
+        Seq("Errors")
+
+      val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
+
+      // Excludes tasks which failed and have incomplete metrics
+      val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
+
+      val summaryTable: Option[Seq[Node]] =
+        if (validTasks.size == 0) {
+          None
+        }
+        else {
+          val serviceTimes = validTasks.map{case (info, metrics, exception) =>
+            metrics.get.executorRunTime.toDouble}
+          val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
+            ms => parent.formatDuration(ms.toLong))
+
+          def getQuantileCols(data: Seq[Double]) =
+            Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
+
+          val shuffleReadSizes = validTasks.map {
+            case(info, metrics, exception) =>
+              metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+          }
+          val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
+
+          val shuffleWriteSizes = validTasks.map {
+            case(info, metrics, exception) =>
+              metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
+          }
+          val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
+
+          val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
+            if (hasShuffleRead) shuffleReadQuantiles else Nil,
+            if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
+
+          val quantileHeaders = Seq("Metric", "Min", "25th percentile",
+            "Median", "75th percentile", "Max")
+          def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
+          Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
+        }
+
+      val content =
+        summary ++
+        <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
+        <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
+        <h4>Tasks</h4> ++ taskTable;
+
+      headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
+    }
+  }
+
+
+  def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
+             (taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
+    def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
+      trace.map(e => <span style="display:block;">{e.toString}</span>)
+    val (info, metrics, exception) = taskData
+
+    val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
+      else metrics.map(m => m.executorRunTime).getOrElse(1)
+    val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
+      else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+    val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
+
+    <tr>
+      <td>{info.taskId}</td>
+      <td>{info.status}</td>
+      <td>{info.taskLocality}</td>
+      <td>{info.host}</td>
+      <td>{dateFmt.format(new Date(info.launchTime))}</td>
+      <td sorttable_customkey={duration.toString}>
+        {formatDuration}
+      </td>
+      <td sorttable_customkey={gcTime.toString}>
+        {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
+      </td>
+      {if (shuffleRead) {
+        <td>{metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
+          Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
+      }}
+      {if (shuffleWrite) {
+        <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
+          Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
+      }}
+      <td>{exception.map(e =>
+        <span>
+          {e.className} ({e.description})<br/>
+          {fmtStackTrace(e.stackTrace)}
+        </span>).getOrElse("")}
+      </td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
new file mode 100644
index 0000000..beb0574
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -0,0 +1,107 @@
+package org.apache.spark.ui.jobs
+
+import java.util.Date
+
+import scala.xml.Node
+import scala.collection.mutable.HashSet
+
+import org.apache.spark.Utils
+import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
+import org.apache.spark.scheduler.Stage
+
+
+/** Page showing list of all ongoing and recently finished stages */
+private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
+
+  val listener = parent.listener
+  val dateFmt = parent.dateFmt
+  val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
+
+  def toNodeSeq(): Seq[Node] = {
+    listener.synchronized {
+      stageTable(stageRow, stages)
+    }
+  }
+
+  /** Special table which merges two header cells. */
+  private def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
+    <table class="table table-bordered table-striped table-condensed sortable">
+      <thead>
+        <th>Stage Id</th>
+        {if (isFairScheduler) {<th>Pool Name</th>} else {}}
+        <th>Description</th>
+        <th>Submitted</th>
+        <th>Duration</th>
+        <th>Tasks: Succeeded/Total</th>
+        <th>Shuffle Read</th>
+        <th>Shuffle Write</th>
+      </thead>
+      <tbody>
+        {rows.map(r => makeRow(r))}
+      </tbody>
+    </table>
+  }
+
+  private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
+    val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
+    val startWidth = "width: %s%%".format((started.toDouble/total)*100)
+
+    <div class="progress">
+      <span style="text-align:center; position:absolute; width:100%;">
+        {completed}/{total} {failed}
+      </span>
+      <div class="bar bar-completed" style={completeWidth}></div>
+      <div class="bar bar-running" style={startWidth}></div>
+    </div>
+  }
+
+
+  private def stageRow(s: Stage): Seq[Node] = {
+    val submissionTime = s.submissionTime match {
+      case Some(t) => dateFmt.format(new Date(t))
+      case None => "Unknown"
+    }
+
+    val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
+      case 0 => ""
+      case b => Utils.bytesToString(b)
+    }
+    val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
+      case 0 => ""
+      case b => Utils.bytesToString(b)
+    }
+
+    val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
+    val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
+    val failedTasks = listener.stageToTasksFailed.getOrElse(s.id, 0) match {
+        case f if f > 0 => "(%s failed)".format(f)
+        case _ => ""
+    }
+    val totalTasks = s.numPartitions
+
+    val poolName = listener.stageToPool.get(s)
+
+    val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
+    val description = listener.stageToDescription.get(s)
+      .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
+    val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
+    val duration =  s.submissionTime.map(t => finishTime - t)
+
+    <tr>
+      <td>{s.id}</td>
+      {if (isFairScheduler) {
+        <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
+      }
+      <td>{description}</td>
+      <td valign="middle">{submissionTime}</td>
+      <td sorttable_customkey={duration.getOrElse(-1).toString}>
+        {duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")}
+      </td>
+      <td class="progress-cell">
+        {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
+      </td>
+      <td>{shuffleRead}</td>
+      <td>{shuffleWrite}</td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
new file mode 100644
index 0000000..1d633d3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import akka.util.Duration
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.ui.JettyUtils._
+
+/** Web UI showing storage status of all RDD's in the given SparkContext. */
+private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
+  implicit val timeout = Duration.create(
+    System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+
+  val indexPage = new IndexPage(this)
+  val rddPage = new RDDPage(this)
+
+  def getHandlers = Seq[(String, Handler)](
+    ("/storage/rdd", (request: HttpServletRequest) => rddPage.render(request)),
+    ("/storage", (request: HttpServletRequest) => indexPage.render(request))
+  )
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
new file mode 100644
index 0000000..1eb4a7a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.storage.{RDDInfo, StorageUtils}
+import org.apache.spark.Utils
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui.Page._
+
+/** Page showing list of RDD's currently stored in the cluster */
+private[spark] class IndexPage(parent: BlockManagerUI) {
+  val sc = parent.sc
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val storageStatusList = sc.getExecutorStorageStatus
+    // Calculate macro-level statistics
+
+    val rddHeaders = Seq(
+      "RDD Name",
+      "Storage Level",
+      "Cached Partitions",
+      "Fraction Cached",
+      "Size in Memory",
+      "Size on Disk")
+    val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+    val content = listingTable(rddHeaders, rddRow, rdds)
+
+    headerSparkPage(content, parent.sc, "Storage ", Storage)
+  }
+
+  def rddRow(rdd: RDDInfo): Seq[Node] = {
+    <tr>
+      <td>
+        <a href={"/storage/rdd?id=%s".format(rdd.id)}>
+          {rdd.name}
+        </a>
+      </td>
+      <td>{rdd.storageLevel.description}
+      </td>
+      <td>{rdd.numCachedPartitions}</td>
+      <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
+      <td>{Utils.bytesToString(rdd.memSize)}</td>
+      <td>{Utils.bytesToString(rdd.diskSize)}</td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
new file mode 100644
index 0000000..37baf17
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.Utils
+import org.apache.spark.storage.{StorageStatus, StorageUtils}
+import org.apache.spark.storage.BlockManagerMasterActor.BlockStatus
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui.Page._
+
+
+/** Page showing storage details for a given RDD */
+private[spark] class RDDPage(parent: BlockManagerUI) {
+  val sc = parent.sc
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val id = request.getParameter("id")
+    val prefix = "rdd_" + id.toString
+    val storageStatusList = sc.getExecutorStorageStatus
+    val filteredStorageStatusList = StorageUtils.
+      filterStorageStatusByPrefix(storageStatusList, prefix)
+    val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+
+    val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage")
+    val workers = filteredStorageStatusList.map((prefix, _))
+    val workerTable = listingTable(workerHeaders, workerRow, workers)
+
+    val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk",
+      "Executors")
+
+    val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
+    val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
+    val blocks = blockStatuses.map {
+      case(id, status) => (id, status, blockLocations.get(id).getOrElse(Seq("UNKNOWN")))
+    }
+    val blockTable = listingTable(blockHeaders, blockRow, blocks)
+
+    val content =
+      <div class="row-fluid">
+        <div class="span12">
+          <ul class="unstyled">
+            <li>
+              <strong>Storage Level:</strong>
+              {rddInfo.storageLevel.description}
+            </li>
+            <li>
+              <strong>Cached Partitions:</strong>
+              {rddInfo.numCachedPartitions}
+            </li>
+            <li>
+              <strong>Total Partitions:</strong>
+              {rddInfo.numPartitions}
+            </li>
+            <li>
+              <strong>Memory Size:</strong>
+              {Utils.bytesToString(rddInfo.memSize)}
+            </li>
+            <li>
+              <strong>Disk Size:</strong>
+              {Utils.bytesToString(rddInfo.diskSize)}
+            </li>
+          </ul>
+        </div>
+      </div>
+
+      <div class="row-fluid">
+        <div class="span12">
+          <h4> Data Distribution on {workers.size} Executors </h4>
+          {workerTable}
+        </div>
+      </div>
+
+      <div class="row-fluid">
+        <div class="span12">
+          <h4> {blocks.size} Partitions </h4>
+          {blockTable}
+        </div>
+      </div>;
+
+    headerSparkPage(content, parent.sc, "RDD Storage Info for " + rddInfo.name, Storage)
+  }
+
+  def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {
+    val (id, block, locations) = row
+    <tr>
+      <td>{id}</td>
+      <td>
+        {block.storageLevel.description}
+      </td>
+      <td sorttable_customkey={block.memSize.toString}>
+        {Utils.bytesToString(block.memSize)}
+      </td>
+      <td sorttable_customkey={block.diskSize.toString}>
+        {Utils.bytesToString(block.diskSize)}
+      </td>
+      <td>
+        {locations.map(l => <span>{l}<br/></span>)}
+      </td>
+    </tr>
+  }
+
+  def workerRow(worker: (String, StorageStatus)): Seq[Node] = {
+    val (prefix, status) = worker
+    <tr>
+      <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
+      <td>
+        {Utils.bytesToString(status.memUsed(prefix))}
+        ({Utils.bytesToString(status.memRemaining)} Remaining)
+      </td>
+      <td>{Utils.bytesToString(status.diskUsed(prefix))}</td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
new file mode 100644
index 0000000..d4c5065
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import akka.actor.{ActorSystem, ExtendedActorSystem}
+import com.typesafe.config.ConfigFactory
+import akka.util.duration._
+import akka.remote.RemoteActorRefProvider
+
+
+/**
+ * Various utility classes for working with Akka.
+ */
+private[spark] object AkkaUtils {
+
+  /**
+   * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
+   * ActorSystem itself and its port (which is hard to get from Akka).
+   *
+   * Note: the `name` parameter is important, as even if a client sends a message to right
+   * host + port, if the system name is incorrect, Akka will drop the message.
+   */
+  def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
+    val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
+    val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
+    val akkaTimeout = System.getProperty("spark.akka.timeout", "60").toInt
+    val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
+    val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+    // 10 seconds is the default akka timeout, but in a cluster, we need higher by default.
+    val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt
+    
+    val akkaConf = ConfigFactory.parseString("""
+      akka.daemonic = on
+      akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
+      akka.stdout-loglevel = "ERROR"
+      akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+      akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
+      akka.remote.netty.hostname = "%s"
+      akka.remote.netty.port = %d
+      akka.remote.netty.connection-timeout = %ds
+      akka.remote.netty.message-frame-size = %d MiB
+      akka.remote.netty.execution-pool-size = %d
+      akka.actor.default-dispatcher.throughput = %d
+      akka.remote.log-remote-lifecycle-events = %s
+      akka.remote.netty.write-timeout = %ds
+      """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
+        lifecycleEvents, akkaWriteTimeout))
+
+    val actorSystem = ActorSystem(name, akkaConf)
+
+    // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
+    // hack because Akka doesn't let you figure out the port through the public API yet.
+    val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
+    val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
+    return (actorSystem, boundPort)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
new file mode 100644
index 0000000..0b51c23
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.Serializable
+import java.util.{PriorityQueue => JPriorityQueue}
+import scala.collection.generic.Growable
+import scala.collection.JavaConverters._
+
+/**
+ * Bounded priority queue. This class wraps the original PriorityQueue
+ * class and modifies it such that only the top K elements are retained.
+ * The top K elements are defined by an implicit Ordering[A].
+ */
+class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
+  extends Iterable[A] with Growable[A] with Serializable {
+
+  private val underlying = new JPriorityQueue[A](maxSize, ord)
+
+  override def iterator: Iterator[A] = underlying.iterator.asScala
+
+  override def ++=(xs: TraversableOnce[A]): this.type = {
+    xs.foreach { this += _ }
+    this
+  }
+
+  override def +=(elem: A): this.type = {
+    if (size < maxSize) underlying.offer(elem)
+    else maybeReplaceLowest(elem)
+    this
+  }
+
+  override def +=(elem1: A, elem2: A, elems: A*): this.type = {
+    this += elem1 += elem2 ++= elems
+  }
+
+  override def clear() { underlying.clear() }
+
+  private def maybeReplaceLowest(a: A): Boolean = {
+    val head = underlying.peek()
+    if (head != null && ord.gt(a, head)) {
+      underlying.poll()
+      underlying.offer(a)
+    } else false
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
new file mode 100644
index 0000000..e214d2a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+import org.apache.spark.storage.BlockManager
+
+/**
+ * Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose()
+ * at the end of the stream (e.g. to close a memory-mapped file).
+ */
+private[spark]
+class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = false)
+  extends InputStream {
+
+  override def read(): Int = {
+    if (buffer == null || buffer.remaining() == 0) {
+      cleanUp()
+      -1
+    } else {
+      buffer.get() & 0xFF
+    }
+  }
+
+  override def read(dest: Array[Byte]): Int = {
+    read(dest, 0, dest.length)
+  }
+
+  override def read(dest: Array[Byte], offset: Int, length: Int): Int = {
+    if (buffer == null || buffer.remaining() == 0) {
+      cleanUp()
+      -1
+    } else {
+      val amountToGet = math.min(buffer.remaining(), length)
+      buffer.get(dest, offset, amountToGet)
+      amountToGet
+    }
+  }
+
+  override def skip(bytes: Long): Long = {
+    if (buffer != null) {
+      val amountToSkip = math.min(bytes, buffer.remaining).toInt
+      buffer.position(buffer.position + amountToSkip)
+      if (buffer.remaining() == 0) {
+        cleanUp()
+      }
+      amountToSkip
+    } else {
+      0L
+    }
+  }
+
+  /**
+   * Clean up the buffer, and potentially dispose of it using BlockManager.dispose().
+   */
+  private def cleanUp() {
+    if (buffer != null) {
+      if (dispose) {
+        BlockManager.dispose(buffer)
+      }
+      buffer = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/Clock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Clock.scala b/core/src/main/scala/org/apache/spark/util/Clock.scala
new file mode 100644
index 0000000..97c2b45
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/Clock.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * An interface to represent clocks, so that they can be mocked out in unit tests.
+ */
+private[spark] trait Clock {
+  def getTime(): Long
+}
+
+private[spark] object SystemClock extends Clock {
+  def getTime(): Long = System.currentTimeMillis()
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
new file mode 100644
index 0000000..dc15a38
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements
+ */
+abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
+  def next = sub.next
+  def hasNext = {
+    val r = sub.hasNext
+    if (!r) {
+      completion
+    }
+    r
+  }
+
+  def completion()
+}
+
+object CompletionIterator {
+  def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = {
+    new CompletionIterator[A,I](sub) {
+      def completion() = completionFunction
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/Distribution.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Distribution.scala b/core/src/main/scala/org/apache/spark/util/Distribution.scala
new file mode 100644
index 0000000..33bf356
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/Distribution.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.PrintStream
+
+/**
+ * Util for getting some stats from a small sample of numeric values, with some handy summary functions.
+ *
+ * Entirely in memory, not intended as a good way to compute stats over large data sets.
+ *
+ * Assumes you are giving it a non-empty set of data
+ */
+class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int) {
+  require(startIdx < endIdx)
+  def this(data: Traversable[Double]) = this(data.toArray, 0, data.size)
+  java.util.Arrays.sort(data, startIdx, endIdx)
+  val length = endIdx - startIdx
+
+  val defaultProbabilities = Array(0,0.25,0.5,0.75,1.0)
+
+  /**
+   * Get the value of the distribution at the given probabilities.  Probabilities should be
+   * given from 0 to 1
+   * @param probabilities
+   */
+  def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities) = {
+    probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))}
+  }
+
+  private def closestIndex(p: Double) = {
+    math.min((p * length).toInt + startIdx, endIdx - 1)
+  }
+
+  def showQuantiles(out: PrintStream = System.out) = {
+    out.println("min\t25%\t50%\t75%\tmax")
+    getQuantiles(defaultProbabilities).foreach{q => out.print(q + "\t")}
+    out.println
+  }
+
+  def statCounter = StatCounter(data.slice(startIdx, endIdx))
+
+  /**
+   * print a summary of this distribution to the given PrintStream.
+   * @param out
+   */
+  def summary(out: PrintStream = System.out) {
+    out.println(statCounter)
+    showQuantiles(out)
+  }
+}
+
+object Distribution {
+
+  def apply(data: Traversable[Double]): Option[Distribution] = {
+    if (data.size > 0)
+      Some(new Distribution(data))
+    else
+      None
+  }
+
+  def showQuantiles(out: PrintStream = System.out, quantiles: Traversable[Double]) {
+    out.println("min\t25%\t50%\t75%\tmax")
+    quantiles.foreach{q => out.print(q + "\t")}
+    out.println
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/IdGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/IdGenerator.scala b/core/src/main/scala/org/apache/spark/util/IdGenerator.scala
new file mode 100644
index 0000000..17e55f7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/IdGenerator.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * A util used to get a unique generation ID. This is a wrapper around Java's
+ * AtomicInteger. An example usage is in BlockManager, where each BlockManager
+ * instance would start an Akka actor and we use this utility to assign the Akka
+ * actors unique names.
+ */
+private[spark] class IdGenerator {
+  private var id = new AtomicInteger
+  def next: Int = id.incrementAndGet
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/IntParam.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/IntParam.scala b/core/src/main/scala/org/apache/spark/util/IntParam.scala
new file mode 100644
index 0000000..626bb49
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/IntParam.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * An extractor object for parsing strings into integers.
+ */
+private[spark] object IntParam {
+  def unapply(str: String): Option[Int] = {
+    try {
+      Some(str.toInt)
+    } catch {
+      case e: NumberFormatException => None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/MemoryParam.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MemoryParam.scala b/core/src/main/scala/org/apache/spark/util/MemoryParam.scala
new file mode 100644
index 0000000..0ee6707
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/MemoryParam.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.Utils
+
+/**
+ * An extractor object for parsing JVM memory strings, such as "10g", into an Int representing
+ * the number of megabytes. Supports the same formats as Utils.memoryStringToMb.
+ */
+private[spark] object MemoryParam {
+  def unapply(str: String): Option[Int] = {
+    try {
+      Some(Utils.memoryStringToMb(str))
+    } catch {
+      case e: NumberFormatException => None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
new file mode 100644
index 0000000..a430a75
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
+import java.util.{TimerTask, Timer}
+import org.apache.spark.Logging
+
+
+/**
+ * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
+ */
+class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
+  private val delaySeconds = MetadataCleaner.getDelaySeconds
+  private val periodSeconds = math.max(10, delaySeconds / 10)
+  private val timer = new Timer(name + " cleanup timer", true)
+
+  private val task = new TimerTask {
+    override def run() {
+      try {
+        cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
+        logInfo("Ran metadata cleaner for " + name)
+      } catch {
+        case e: Exception => logError("Error running cleanup task for " + name, e)
+      }
+    }
+  }
+
+  if (delaySeconds > 0) {
+    logDebug(
+      "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
+      "and period of " + periodSeconds + " secs")
+    timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
+  }
+
+  def cancel() {
+    timer.cancel()
+  }
+}
+
+
+object MetadataCleaner {
+  def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
+  def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString) }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/MutablePair.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MutablePair.scala b/core/src/main/scala/org/apache/spark/util/MutablePair.scala
new file mode 100644
index 0000000..34f1f66
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/MutablePair.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+
+/**
+ * A tuple of 2 elements. This can be used as an alternative to Scala's Tuple2 when we want to
+ * minimize object allocation.
+ *
+ * @param  _1   Element 1 of this MutablePair
+ * @param  _2   Element 2 of this MutablePair
+ */
+case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
+                      @specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
+  (var _1: T1, var _2: T2)
+  extends Product2[T1, T2]
+{
+  override def toString = "(" + _1 + "," + _2 + ")"
+
+  override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[_,_]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/NextIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/NextIterator.scala b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
new file mode 100644
index 0000000..8266e5e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/** Provides a basic/boilerplate Iterator implementation. */
+private[spark] abstract class NextIterator[U] extends Iterator[U] {
+  
+  private var gotNext = false
+  private var nextValue: U = _
+  private var closed = false
+  protected var finished = false
+
+  /**
+   * Method for subclasses to implement to provide the next element.
+   *
+   * If no next element is available, the subclass should set `finished`
+   * to `true` and may return any value (it will be ignored).
+   *
+   * This convention is required because `null` may be a valid value,
+   * and using `Option` seems like it might create unnecessary Some/None
+   * instances, given some iterators might be called in a tight loop.
+   * 
+   * @return U, or set 'finished' when done
+   */
+  protected def getNext(): U
+
+  /**
+   * Method for subclasses to implement when all elements have been successfully
+   * iterated, and the iteration is done.
+   *
+   * <b>Note:</b> `NextIterator` cannot guarantee that `close` will be
+   * called because it has no control over what happens when an exception
+   * happens in the user code that is calling hasNext/next.
+   *
+   * Ideally you should have another try/catch, as in HadoopRDD, that
+   * ensures any resources are closed should iteration fail.
+   */
+  protected def close()
+
+  /**
+   * Calls the subclass-defined close method, but only once.
+   *
+   * Usually calling `close` multiple times should be fine, but historically
+   * there have been issues with some InputFormats throwing exceptions.
+   */
+  def closeIfNeeded() {
+    if (!closed) {
+      close()
+      closed = true
+    }
+  }
+
+  override def hasNext: Boolean = {
+    if (!finished) {
+      if (!gotNext) {
+        nextValue = getNext()
+        if (finished) {
+          closeIfNeeded()
+        }
+        gotNext = true
+      }
+    }
+    !finished
+  }
+
+  override def next(): U = {
+    if (!hasNext) {
+      throw new NoSuchElementException("End of stream")
+    }
+    gotNext = false
+    nextValue
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala b/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala
new file mode 100644
index 0000000..47e1b45
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.annotation.tailrec
+
+import java.io.OutputStream
+import java.util.concurrent.TimeUnit._
+
+class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
+  val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
+  val CHUNK_SIZE = 8192
+  var lastSyncTime = System.nanoTime
+  var bytesWrittenSinceSync: Long = 0
+
+  override def write(b: Int) {
+    waitToWrite(1)
+    out.write(b)
+  }
+
+  override def write(bytes: Array[Byte]) {
+    write(bytes, 0, bytes.length)
+  }
+
+  @tailrec
+  override final def write(bytes: Array[Byte], offset: Int, length: Int) {
+    val writeSize = math.min(length - offset, CHUNK_SIZE)
+    if (writeSize > 0) {
+      waitToWrite(writeSize)
+      out.write(bytes, offset, writeSize)
+      write(bytes, offset + writeSize, length)
+    }
+  }
+
+  override def flush() {
+    out.flush()
+  }
+
+  override def close() {
+    out.close()
+  }
+
+  @tailrec
+  private def waitToWrite(numBytes: Int) {
+    val now = System.nanoTime
+    val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS)
+    val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
+    if (rate < bytesPerSec) {
+      // It's okay to write; just update some variables and return
+      bytesWrittenSinceSync += numBytes
+      if (now > lastSyncTime + SYNC_INTERVAL) {
+        // Sync interval has passed; let's resync
+        lastSyncTime = now
+        bytesWrittenSinceSync = numBytes
+      }
+    } else {
+      // Calculate how much time we should sleep to bring ourselves to the desired rate.
+      // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
+      val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
+      if (sleepTime > 0) Thread.sleep(sleepTime)
+      waitToWrite(numBytes)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
new file mode 100644
index 0000000..f2b1ad7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.nio.ByteBuffer
+import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream}
+import java.nio.channels.Channels
+
+/**
+ * A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make
+ * it easier to pass ByteBuffers in case class messages.
+ */
+private[spark]
+class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
+  def value = buffer
+
+  private def readObject(in: ObjectInputStream) {
+    val length = in.readInt()
+    buffer = ByteBuffer.allocate(length)
+    var amountRead = 0
+    val channel = Channels.newChannel(in)
+    while (amountRead < length) {
+      val ret = channel.read(buffer)
+      if (ret == -1) {
+        throw new EOFException("End of file before fully reading buffer")
+      }
+      amountRead += ret
+    }
+    buffer.rewind() // Allow us to read it later
+  }
+
+  private def writeObject(out: ObjectOutputStream) {
+    out.writeInt(buffer.limit())
+    if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
+      throw new IOException("Could not fully write buffer to output stream")
+    }
+    buffer.rewind() // Allow us to write it again later
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/StatCounter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
new file mode 100644
index 0000000..020d5ed
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * A class for tracking the statistics of a set of numbers (count, mean and variance) in a
+ * numerically robust way. Includes support for merging two StatCounters. Based on 
+ * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]].
+ *
+ * @constructor Initialize the StatCounter with the given values.
+ */
+class StatCounter(values: TraversableOnce[Double]) extends Serializable {
+  private var n: Long = 0     // Running count of our values
+  private var mu: Double = 0  // Running mean of our values
+  private var m2: Double = 0  // Running variance numerator (sum of (x - mean)^2)
+
+  merge(values)
+
+  /** Initialize the StatCounter with no values. */
+  def this() = this(Nil)
+
+  /** Add a value into this StatCounter, updating the internal statistics. */
+  def merge(value: Double): StatCounter = {
+    val delta = value - mu
+    n += 1
+    mu += delta / n
+    m2 += delta * (value - mu)
+    this
+  }
+
+  /** Add multiple values into this StatCounter, updating the internal statistics. */
+  def merge(values: TraversableOnce[Double]): StatCounter = {
+    values.foreach(v => merge(v))
+    this
+  }
+
+  /** Merge another StatCounter into this one, adding up the internal statistics. */
+  def merge(other: StatCounter): StatCounter = {
+    if (other == this) {
+      merge(other.copy())  // Avoid overwriting fields in a weird order
+    } else {
+      if (n == 0) {
+        mu = other.mu
+        m2 = other.m2
+        n = other.n       
+      } else if (other.n != 0) {        
+        val delta = other.mu - mu
+        if (other.n * 10 < n) {
+          mu = mu + (delta * other.n) / (n + other.n)
+        } else if (n * 10 < other.n) {
+          mu = other.mu - (delta * n) / (n + other.n)
+        } else {
+          mu = (mu * n + other.mu * other.n) / (n + other.n)
+        }
+        m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
+        n += other.n
+      }
+      this	   
+    }
+  }
+
+  /** Clone this StatCounter */
+  def copy(): StatCounter = {
+    val other = new StatCounter
+    other.n = n
+    other.mu = mu
+    other.m2 = m2
+    other
+  }
+
+  def count: Long = n
+
+  def mean: Double = mu
+
+  def sum: Double = n * mu
+
+  /** Return the variance of the values. */
+  def variance: Double = {
+    if (n == 0)
+      Double.NaN
+    else
+      m2 / n
+  }
+
+  /**
+   * Return the sample variance, which corrects for bias in estimating the variance by dividing
+   * by N-1 instead of N.
+   */
+  def sampleVariance: Double = {
+    if (n <= 1)
+      Double.NaN
+    else
+      m2 / (n - 1)
+  }
+
+  /** Return the standard deviation of the values. */
+  def stdev: Double = math.sqrt(variance)
+
+  /**
+   * Return the sample standard deviation of the values, which corrects for bias in estimating the
+   * variance by dividing by N-1 instead of N.
+   */
+  def sampleStdev: Double = math.sqrt(sampleVariance)
+
+  override def toString: String = {
+    "(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev)
+  }
+}
+
+object StatCounter {
+  /** Build a StatCounter from a list of values. */
+  def apply(values: TraversableOnce[Double]) = new StatCounter(values)
+
+  /** Build a StatCounter from a list of values passed as variable-length arguments. */
+  def apply(values: Double*) = new StatCounter(values)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
new file mode 100644
index 0000000..277de2f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConversions
+import scala.collection.mutable.Map
+import scala.collection.immutable
+import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.Logging
+
+/**
+ * This is a custom implementation of scala.collection.mutable.Map which stores the insertion
+ * time stamp along with each key-value pair. Key-value pairs that are older than a particular
+ * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in
+ * replacement of scala.collection.mutable.HashMap.
+ */
+class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging {
+  val internalMap = new ConcurrentHashMap[A, (B, Long)]()
+
+  def get(key: A): Option[B] = {
+    val value = internalMap.get(key)
+    if (value != null) Some(value._1) else None
+  }
+
+  def iterator: Iterator[(A, B)] = {
+    val jIterator = internalMap.entrySet().iterator()
+    JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue._1))
+  }
+
+  override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
+    val newMap = new TimeStampedHashMap[A, B1]
+    newMap.internalMap.putAll(this.internalMap)
+    newMap.internalMap.put(kv._1, (kv._2, currentTime))
+    newMap
+  }
+
+  override def - (key: A): Map[A, B] = {
+    val newMap = new TimeStampedHashMap[A, B]
+    newMap.internalMap.putAll(this.internalMap)
+    newMap.internalMap.remove(key)
+    newMap
+  }
+
+  override def += (kv: (A, B)): this.type = {
+    internalMap.put(kv._1, (kv._2, currentTime))
+    this
+  }
+
+  // Should we return previous value directly or as Option ?
+  def putIfAbsent(key: A, value: B): Option[B] = {
+    val prev = internalMap.putIfAbsent(key, (value, currentTime))
+    if (prev != null) Some(prev._1) else None
+  }
+
+
+  override def -= (key: A): this.type = {
+    internalMap.remove(key)
+    this
+  }
+
+  override def update(key: A, value: B) {
+    this += ((key, value))
+  }
+
+  override def apply(key: A): B = {
+    val value = internalMap.get(key)
+    if (value == null) throw new NoSuchElementException()
+    value._1
+  }
+
+  override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
+    JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
+  }
+
+  override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
+
+  override def size: Int = internalMap.size
+
+  override def foreach[U](f: ((A, B)) => U) {
+    val iterator = internalMap.entrySet().iterator()
+    while(iterator.hasNext) {
+      val entry = iterator.next()
+      val kv = (entry.getKey, entry.getValue._1)
+      f(kv)
+    }
+  }
+
+  def toMap: immutable.Map[A, B] = iterator.toMap
+
+  /**
+   * Removes old key-value pairs that have timestamp earlier than `threshTime`
+   */
+  def clearOldValues(threshTime: Long) {
+    val iterator = internalMap.entrySet().iterator()
+    while(iterator.hasNext) {
+      val entry = iterator.next()
+      if (entry.getValue._2 < threshTime) {
+        logDebug("Removing key " + entry.getKey)
+        iterator.remove()
+      }
+    }
+  }
+
+  private def currentTime: Long = System.currentTimeMillis()
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
new file mode 100644
index 0000000..2698313
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.collection.mutable.Set
+import scala.collection.JavaConversions
+import java.util.concurrent.ConcurrentHashMap
+
+
+class TimeStampedHashSet[A] extends Set[A] {
+  val internalMap = new ConcurrentHashMap[A, Long]()
+
+  def contains(key: A): Boolean = {
+    internalMap.contains(key)
+  }
+
+  def iterator: Iterator[A] = {
+    val jIterator = internalMap.entrySet().iterator()
+    JavaConversions.asScalaIterator(jIterator).map(_.getKey)
+  }
+
+  override def + (elem: A): Set[A] = {
+    val newSet = new TimeStampedHashSet[A]
+    newSet ++= this
+    newSet += elem
+    newSet
+  }
+
+  override def - (elem: A): Set[A] = {
+    val newSet = new TimeStampedHashSet[A]
+    newSet ++= this
+    newSet -= elem
+    newSet
+  }
+
+  override def += (key: A): this.type = {
+    internalMap.put(key, currentTime)
+    this
+  }
+
+  override def -= (key: A): this.type = {
+    internalMap.remove(key)
+    this
+  }
+
+  override def empty: Set[A] = new TimeStampedHashSet[A]()
+
+  override def size(): Int = internalMap.size()
+
+  override def foreach[U](f: (A) => U): Unit = {
+    val iterator = internalMap.entrySet().iterator()
+    while(iterator.hasNext) {
+      f(iterator.next.getKey)
+    }
+  }
+
+  /**
+   * Removes old values that have timestamp earlier than `threshTime`
+   */
+  def clearOldValues(threshTime: Long) {
+    val iterator = internalMap.entrySet().iterator()
+    while(iterator.hasNext) {
+      val entry = iterator.next()
+      if (entry.getValue < threshTime) {
+        iterator.remove()
+      }
+    }
+  }
+
+  private def currentTime: Long = System.currentTimeMillis()
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/util/Vector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
new file mode 100644
index 0000000..fe710c5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+class Vector(val elements: Array[Double]) extends Serializable {
+  def length = elements.length
+
+  def apply(index: Int) = elements(index)
+
+  def + (other: Vector): Vector = {
+    if (length != other.length)
+      throw new IllegalArgumentException("Vectors of different length")
+    return Vector(length, i => this(i) + other(i))
+  }
+
+  def add(other: Vector) = this + other
+
+  def - (other: Vector): Vector = {
+    if (length != other.length)
+      throw new IllegalArgumentException("Vectors of different length")
+    return Vector(length, i => this(i) - other(i))
+  }
+
+  def subtract(other: Vector) = this - other
+
+  def dot(other: Vector): Double = {
+    if (length != other.length)
+      throw new IllegalArgumentException("Vectors of different length")
+    var ans = 0.0
+    var i = 0
+    while (i < length) {
+      ans += this(i) * other(i)
+      i += 1
+    }
+    return ans
+  }
+
+  /**
+   * return (this + plus) dot other, but without creating any intermediate storage
+   * @param plus
+   * @param other
+   * @return
+   */
+  def plusDot(plus: Vector, other: Vector): Double = {
+    if (length != other.length)
+      throw new IllegalArgumentException("Vectors of different length")
+    if (length != plus.length)
+      throw new IllegalArgumentException("Vectors of different length")
+    var ans = 0.0
+    var i = 0
+    while (i < length) {
+      ans += (this(i) + plus(i)) * other(i)
+      i += 1
+    }
+    return ans
+  }
+
+  def += (other: Vector): Vector = {
+    if (length != other.length)
+      throw new IllegalArgumentException("Vectors of different length")
+    var i = 0
+    while (i < length) {
+      elements(i) += other(i)
+      i += 1
+    }
+    this
+  }
+
+  def addInPlace(other: Vector) = this +=other
+
+  def * (scale: Double): Vector = Vector(length, i => this(i) * scale)
+
+  def multiply (d: Double) = this * d
+
+  def / (d: Double): Vector = this * (1 / d)
+
+  def divide (d: Double) = this / d
+
+  def unary_- = this * -1
+
+  def sum = elements.reduceLeft(_ + _)
+
+  def squaredDist(other: Vector): Double = {
+    var ans = 0.0
+    var i = 0
+    while (i < length) {
+      ans += (this(i) - other(i)) * (this(i) - other(i))
+      i += 1
+    }
+    return ans
+  }
+
+  def dist(other: Vector): Double = math.sqrt(squaredDist(other))
+
+  override def toString = elements.mkString("(", ", ", ")")
+}
+
+object Vector {
+  def apply(elements: Array[Double]) = new Vector(elements)
+
+  def apply(elements: Double*) = new Vector(elements.toArray)
+
+  def apply(length: Int, initializer: Int => Double): Vector = {
+    val elements: Array[Double] = Array.tabulate(length)(initializer)
+    return new Vector(elements)
+  }
+
+  def zeros(length: Int) = new Vector(new Array[Double](length))
+
+  def ones(length: Int) = Vector(length, _ => 1)
+
+  class Multiplier(num: Double) {
+    def * (vec: Vector) = vec * num
+  }
+
+  implicit def doubleToMultiplier(num: Double) = new Multiplier(num)
+
+  implicit object VectorAccumParam extends org.apache.spark.AccumulatorParam[Vector] {
+    def addInPlace(t1: Vector, t2: Vector) = t1 + t2
+
+    def zero(initialValue: Vector) = Vector.zeros(initialValue.length)
+  }
+
+}