You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:57 UTC

[13/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
deleted file mode 100644
index 1d9767a..0000000
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-package spark.ui.jobs
-
-import scala.Seq
-import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
-
-import spark.{ExceptionFailure, SparkContext, Success, Utils}
-import spark.scheduler._
-import spark.scheduler.cluster.TaskInfo
-import spark.executor.TaskMetrics
-import collection.mutable
-
-/**
- * Tracks task-level information to be displayed in the UI.
- *
- * All access to the data structures in this class must be synchronized on the
- * class, since the UI thread and the DAGScheduler event loop may otherwise
- * be reading/updating the internal data structures concurrently.
- */
-private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
-  // How many stages to remember
-  val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
-  val DEFAULT_POOL_NAME = "default"
-
-  val stageToPool = new HashMap[Stage, String]()
-  val stageToDescription = new HashMap[Stage, String]()
-  val poolToActiveStages = new HashMap[String, HashSet[Stage]]()
-
-  val activeStages = HashSet[Stage]()
-  val completedStages = ListBuffer[Stage]()
-  val failedStages = ListBuffer[Stage]()
-
-  // Total metrics reflect metrics only for completed tasks
-  var totalTime = 0L
-  var totalShuffleRead = 0L
-  var totalShuffleWrite = 0L
-
-  val stageToTime = HashMap[Int, Long]()
-  val stageToShuffleRead = HashMap[Int, Long]()
-  val stageToShuffleWrite = HashMap[Int, Long]()
-  val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
-  val stageToTasksComplete = HashMap[Int, Int]()
-  val stageToTasksFailed = HashMap[Int, Int]()
-  val stageToTaskInfos =
-    HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
-
-  override def onJobStart(jobStart: SparkListenerJobStart) {}
-
-  override def onStageCompleted(stageCompleted: StageCompleted) = synchronized {
-    val stage = stageCompleted.stageInfo.stage
-    poolToActiveStages(stageToPool(stage)) -= stage
-    activeStages -= stage
-    completedStages += stage
-    trimIfNecessary(completedStages)
-  }
-
-  /** If stages is too large, remove and garbage collect old stages */
-  def trimIfNecessary(stages: ListBuffer[Stage]) = synchronized {
-    if (stages.size > RETAINED_STAGES) {
-      val toRemove = RETAINED_STAGES / 10
-      stages.takeRight(toRemove).foreach( s => {
-        stageToTaskInfos.remove(s.id)
-        stageToTime.remove(s.id)
-        stageToShuffleRead.remove(s.id)
-        stageToShuffleWrite.remove(s.id)
-        stageToTasksActive.remove(s.id)
-        stageToTasksComplete.remove(s.id)
-        stageToTasksFailed.remove(s.id)
-        stageToPool.remove(s)
-        if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
-      })
-      stages.trimEnd(toRemove)
-    }
-  }
-
-  /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
-    val stage = stageSubmitted.stage
-    activeStages += stage
-
-    val poolName = Option(stageSubmitted.properties).map {
-      p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
-    }.getOrElse(DEFAULT_POOL_NAME)
-    stageToPool(stage) = poolName
-
-    val description = Option(stageSubmitted.properties).flatMap {
-      p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
-    }
-    description.map(d => stageToDescription(stage) = d)
-
-    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
-    stages += stage
-  }
-  
-  override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
-    val sid = taskStart.task.stageId
-    val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
-    tasksActive += taskStart.taskInfo
-    val taskList = stageToTaskInfos.getOrElse(
-      sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
-    taskList += ((taskStart.taskInfo, None, None))
-    stageToTaskInfos(sid) = taskList
-  }
- 
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
-    val sid = taskEnd.task.stageId
-    val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
-    tasksActive -= taskEnd.taskInfo
-    val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
-      taskEnd.reason match {
-        case e: ExceptionFailure =>
-          stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
-          (Some(e), e.metrics)
-        case _ =>
-          stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
-          (None, Option(taskEnd.taskMetrics))
-      }
-
-    stageToTime.getOrElseUpdate(sid, 0L)
-    val time = metrics.map(m => m.executorRunTime).getOrElse(0)
-    stageToTime(sid) += time
-    totalTime += time
-
-    stageToShuffleRead.getOrElseUpdate(sid, 0L)
-    val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
-      s.remoteBytesRead).getOrElse(0L)
-    stageToShuffleRead(sid) += shuffleRead
-    totalShuffleRead += shuffleRead
-
-    stageToShuffleWrite.getOrElseUpdate(sid, 0L)
-    val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
-      s.shuffleBytesWritten).getOrElse(0L)
-    stageToShuffleWrite(sid) += shuffleWrite
-    totalShuffleWrite += shuffleWrite
-
-    val taskList = stageToTaskInfos.getOrElse(
-      sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
-    taskList -= ((taskEnd.taskInfo, None, None))
-    taskList += ((taskEnd.taskInfo, metrics, failureInfo))
-    stageToTaskInfos(sid) = taskList
-  }
-
-  override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
-    jobEnd match {
-      case end: SparkListenerJobEnd =>
-        end.jobResult match {
-          case JobFailed(ex, Some(stage)) =>
-            activeStages -= stage
-            poolToActiveStages(stageToPool(stage)) -= stage
-            failedStages += stage
-            trimIfNecessary(failedStages)
-          case _ =>
-        }
-      case _ =>
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
deleted file mode 100644
index c83f102..0000000
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.ui.jobs
-
-import akka.util.Duration
-
-import java.text.SimpleDateFormat
-
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.server.Handler
-
-import scala.Seq
-import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
-
-import spark.ui.JettyUtils._
-import spark.{ExceptionFailure, SparkContext, Success, Utils}
-import spark.scheduler._
-import collection.mutable
-import spark.scheduler.cluster.SchedulingMode
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
-
-/** Web UI showing progress status of all jobs in the given SparkContext. */
-private[spark] class JobProgressUI(val sc: SparkContext) {
-  private var _listener: Option[JobProgressListener] = None
-  def listener = _listener.get
-  val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-
-  private val indexPage = new IndexPage(this)
-  private val stagePage = new StagePage(this)
-  private val poolPage = new PoolPage(this)
-
-  def start() {
-    _listener = Some(new JobProgressListener(sc))
-    sc.addSparkListener(listener)
-  }
-
-  def formatDuration(ms: Long) = Utils.msDurationToString(ms)
-
-  def getHandlers = Seq[(String, Handler)](
-    ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
-    ("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)),
-    ("/stages", (request: HttpServletRequest) => indexPage.render(request))
-  )
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
deleted file mode 100644
index 7fb74dc..0000000
--- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package spark.ui.jobs
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.{NodeSeq, Node}
-import scala.collection.mutable.HashSet
-
-import spark.scheduler.Stage
-import spark.ui.UIUtils._
-import spark.ui.Page._
-
-/** Page showing specific pool details */
-private[spark] class PoolPage(parent: JobProgressUI) {
-  def listener = parent.listener
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    listener.synchronized {
-      val poolName = request.getParameter("poolname")
-      val poolToActiveStages = listener.poolToActiveStages
-      val activeStages = poolToActiveStages.get(poolName).toSeq.flatten
-      val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
-
-      val pool = listener.sc.getPoolForName(poolName).get
-      val poolTable = new PoolTable(Seq(pool), listener)
-
-      val content = <h4>Summary </h4> ++ poolTable.toNodeSeq() ++
-                    <h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq()
-
-      headerSparkPage(content, parent.sc, "Fair Scheduler Pool: " + poolName, Stages)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
deleted file mode 100644
index 621828f..0000000
--- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-package spark.ui.jobs
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.xml.Node
-
-import spark.scheduler.Stage
-import spark.scheduler.cluster.Schedulable
-
-/** Table showing list of pools */
-private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
-
-  var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
-
-  def toNodeSeq(): Seq[Node] = {
-    listener.synchronized {
-      poolTable(poolRow, pools)
-    }
-  }
-
-  private def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
-    rows: Seq[Schedulable]
-    ): Seq[Node] = {
-    <table class="table table-bordered table-striped table-condensed sortable table-fixed">
-      <thead>
-        <th>Pool Name</th>
-        <th>Minimum Share</th>
-        <th>Pool Weight</th>
-        <th>Active Stages</th>
-        <th>Running Tasks</th>
-        <th>SchedulingMode</th>
-      </thead>
-      <tbody>
-        {rows.map(r => makeRow(r, poolToActiveStages))}
-      </tbody>
-    </table>
-  }
-
-  private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]])
-    : Seq[Node] = {
-    val activeStages = poolToActiveStages.get(p.name) match {
-      case Some(stages) => stages.size
-      case None => 0
-    }
-    <tr>
-      <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
-      <td>{p.minShare}</td>
-      <td>{p.weight}</td>
-      <td>{activeStages}</td>
-      <td>{p.runningTasks}</td>
-      <td>{p.schedulingMode}</td>
-    </tr>
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
deleted file mode 100644
index c234147..0000000
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.ui.jobs
-
-import java.util.Date
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import spark.ui.UIUtils._
-import spark.ui.Page._
-import spark.util.Distribution
-import spark.{ExceptionFailure, Utils}
-import spark.scheduler.cluster.TaskInfo
-import spark.executor.TaskMetrics
-
-/** Page showing statistics and task list for a given stage */
-private[spark] class StagePage(parent: JobProgressUI) {
-  def listener = parent.listener
-  val dateFmt = parent.dateFmt
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    listener.synchronized {
-      val stageId = request.getParameter("id").toInt
-      val now = System.currentTimeMillis()
-
-      if (!listener.stageToTaskInfos.contains(stageId)) {
-        val content =
-          <div>
-            <h4>Summary Metrics</h4> No tasks have started yet
-            <h4>Tasks</h4> No tasks have started yet
-          </div>
-        return headerSparkPage(content, parent.sc, "Details for Stage %s".format(stageId), Stages)
-      }
-
-      val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime)
-
-      val numCompleted = tasks.count(_._1.finished)
-      val shuffleReadBytes = listener.stageToShuffleRead.getOrElse(stageId, 0L)
-      val hasShuffleRead = shuffleReadBytes > 0
-      val shuffleWriteBytes = listener.stageToShuffleWrite.getOrElse(stageId, 0L)
-      val hasShuffleWrite = shuffleWriteBytes > 0
-
-      var activeTime = 0L
-      listener.stageToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
-
-      val summary =
-        <div>
-          <ul class="unstyled">
-            <li>
-              <strong>CPU time: </strong>
-              {parent.formatDuration(listener.stageToTime.getOrElse(stageId, 0L) + activeTime)}
-            </li>
-            {if (hasShuffleRead)
-              <li>
-                <strong>Shuffle read: </strong>
-                {Utils.bytesToString(shuffleReadBytes)}
-              </li>
-            }
-            {if (hasShuffleWrite)
-              <li>
-                <strong>Shuffle write: </strong>
-                {Utils.bytesToString(shuffleWriteBytes)}
-              </li>
-            }
-          </ul>
-        </div>
-
-      val taskHeaders: Seq[String] =
-        Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++
-        Seq("GC Time") ++
-        {if (hasShuffleRead) Seq("Shuffle Read")  else Nil} ++
-        {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
-        Seq("Errors")
-
-      val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
-
-      // Excludes tasks which failed and have incomplete metrics
-      val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
-
-      val summaryTable: Option[Seq[Node]] =
-        if (validTasks.size == 0) {
-          None
-        }
-        else {
-          val serviceTimes = validTasks.map{case (info, metrics, exception) =>
-            metrics.get.executorRunTime.toDouble}
-          val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
-            ms => parent.formatDuration(ms.toLong))
-
-          def getQuantileCols(data: Seq[Double]) =
-            Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
-
-          val shuffleReadSizes = validTasks.map {
-            case(info, metrics, exception) =>
-              metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
-          }
-          val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
-
-          val shuffleWriteSizes = validTasks.map {
-            case(info, metrics, exception) =>
-              metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
-          }
-          val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
-
-          val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
-            if (hasShuffleRead) shuffleReadQuantiles else Nil,
-            if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
-
-          val quantileHeaders = Seq("Metric", "Min", "25th percentile",
-            "Median", "75th percentile", "Max")
-          def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
-          Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
-        }
-
-      val content =
-        summary ++
-        <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
-        <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
-        <h4>Tasks</h4> ++ taskTable;
-
-      headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
-    }
-  }
-
-
-  def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
-             (taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
-    def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
-      trace.map(e => <span style="display:block;">{e.toString}</span>)
-    val (info, metrics, exception) = taskData
-
-    val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
-      else metrics.map(m => m.executorRunTime).getOrElse(1)
-    val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
-      else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
-    val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
-
-    <tr>
-      <td>{info.taskId}</td>
-      <td>{info.status}</td>
-      <td>{info.taskLocality}</td>
-      <td>{info.host}</td>
-      <td>{dateFmt.format(new Date(info.launchTime))}</td>
-      <td sorttable_customkey={duration.toString}>
-        {formatDuration}
-      </td>
-      <td sorttable_customkey={gcTime.toString}>
-        {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
-      </td>
-      {if (shuffleRead) {
-        <td>{metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
-          Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
-      }}
-      {if (shuffleWrite) {
-        <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
-          Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
-      }}
-      <td>{exception.map(e =>
-        <span>
-          {e.className} ({e.description})<br/>
-          {fmtStackTrace(e.stackTrace)}
-        </span>).getOrElse("")}
-      </td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
deleted file mode 100644
index 2b1bc98..0000000
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-package spark.ui.jobs
-
-import java.util.Date
-
-import scala.xml.Node
-import scala.collection.mutable.HashSet
-
-import spark.Utils
-import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
-import spark.scheduler.Stage
-
-
-/** Page showing list of all ongoing and recently finished stages */
-private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
-
-  val listener = parent.listener
-  val dateFmt = parent.dateFmt
-  val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
-
-  def toNodeSeq(): Seq[Node] = {
-    listener.synchronized {
-      stageTable(stageRow, stages)
-    }
-  }
-
-  /** Special table which merges two header cells. */
-  private def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
-    <table class="table table-bordered table-striped table-condensed sortable">
-      <thead>
-        <th>Stage Id</th>
-        {if (isFairScheduler) {<th>Pool Name</th>} else {}}
-        <th>Description</th>
-        <th>Submitted</th>
-        <th>Duration</th>
-        <th>Tasks: Succeeded/Total</th>
-        <th>Shuffle Read</th>
-        <th>Shuffle Write</th>
-      </thead>
-      <tbody>
-        {rows.map(r => makeRow(r))}
-      </tbody>
-    </table>
-  }
-
-  private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
-    val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
-    val startWidth = "width: %s%%".format((started.toDouble/total)*100)
-
-    <div class="progress">
-      <span style="text-align:center; position:absolute; width:100%;">
-        {completed}/{total} {failed}
-      </span>
-      <div class="bar bar-completed" style={completeWidth}></div>
-      <div class="bar bar-running" style={startWidth}></div>
-    </div>
-  }
-
-
-  private def stageRow(s: Stage): Seq[Node] = {
-    val submissionTime = s.submissionTime match {
-      case Some(t) => dateFmt.format(new Date(t))
-      case None => "Unknown"
-    }
-
-    val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
-      case 0 => ""
-      case b => Utils.bytesToString(b)
-    }
-    val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
-      case 0 => ""
-      case b => Utils.bytesToString(b)
-    }
-
-    val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
-    val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
-    val failedTasks = listener.stageToTasksFailed.getOrElse(s.id, 0) match {
-        case f if f > 0 => "(%s failed)".format(f)
-        case _ => ""
-    }
-    val totalTasks = s.numPartitions
-
-    val poolName = listener.stageToPool.get(s)
-
-    val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
-    val description = listener.stageToDescription.get(s)
-      .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
-    val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
-    val duration =  s.submissionTime.map(t => finishTime - t)
-
-    <tr>
-      <td>{s.id}</td>
-      {if (isFairScheduler) {
-        <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
-      }
-      <td>{description}</td>
-      <td valign="middle">{submissionTime}</td>
-      <td sorttable_customkey={duration.getOrElse(-1).toString}>
-        {duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")}
-      </td>
-      <td class="progress-cell">
-        {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
-      </td>
-      <td>{shuffleRead}</td>
-      <td>{shuffleWrite}</td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
deleted file mode 100644
index 49ed069..0000000
--- a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.ui.storage
-
-import akka.util.Duration
-
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.server.Handler
-
-import spark.{Logging, SparkContext}
-import spark.ui.JettyUtils._
-
-/** Web UI showing storage status of all RDD's in the given SparkContext. */
-private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
-  implicit val timeout = Duration.create(
-    System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
-  val indexPage = new IndexPage(this)
-  val rddPage = new RDDPage(this)
-
-  def getHandlers = Seq[(String, Handler)](
-    ("/storage/rdd", (request: HttpServletRequest) => rddPage.render(request)),
-    ("/storage", (request: HttpServletRequest) => indexPage.render(request))
-  )
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/storage/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
deleted file mode 100644
index fc6273c..0000000
--- a/core/src/main/scala/spark/ui/storage/IndexPage.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.ui.storage
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import spark.storage.{RDDInfo, StorageUtils}
-import spark.Utils
-import spark.ui.UIUtils._
-import spark.ui.Page._
-
-/** Page showing list of RDD's currently stored in the cluster */
-private[spark] class IndexPage(parent: BlockManagerUI) {
-  val sc = parent.sc
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val storageStatusList = sc.getExecutorStorageStatus
-    // Calculate macro-level statistics
-
-    val rddHeaders = Seq(
-      "RDD Name",
-      "Storage Level",
-      "Cached Partitions",
-      "Fraction Cached",
-      "Size in Memory",
-      "Size on Disk")
-    val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
-    val content = listingTable(rddHeaders, rddRow, rdds)
-
-    headerSparkPage(content, parent.sc, "Storage ", Storage)
-  }
-
-  def rddRow(rdd: RDDInfo): Seq[Node] = {
-    <tr>
-      <td>
-        <a href={"/storage/rdd?id=%s".format(rdd.id)}>
-          {rdd.name}
-        </a>
-      </td>
-      <td>{rdd.storageLevel.description}
-      </td>
-      <td>{rdd.numCachedPartitions}</td>
-      <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
-      <td>{Utils.bytesToString(rdd.memSize)}</td>
-      <td>{Utils.bytesToString(rdd.diskSize)}</td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
deleted file mode 100644
index b128a56..0000000
--- a/core/src/main/scala/spark/ui/storage/RDDPage.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.ui.storage
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import spark.Utils
-import spark.storage.{StorageStatus, StorageUtils}
-import spark.storage.BlockManagerMasterActor.BlockStatus
-import spark.ui.UIUtils._
-import spark.ui.Page._
-
-
-/** Page showing storage details for a given RDD */
-private[spark] class RDDPage(parent: BlockManagerUI) {
-  val sc = parent.sc
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val id = request.getParameter("id")
-    val prefix = "rdd_" + id.toString
-    val storageStatusList = sc.getExecutorStorageStatus
-    val filteredStorageStatusList = StorageUtils.
-      filterStorageStatusByPrefix(storageStatusList, prefix)
-    val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
-
-    val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage")
-    val workers = filteredStorageStatusList.map((prefix, _))
-    val workerTable = listingTable(workerHeaders, workerRow, workers)
-
-    val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk",
-      "Executors")
-
-    val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
-    val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
-    val blocks = blockStatuses.map {
-      case(id, status) => (id, status, blockLocations.get(id).getOrElse(Seq("UNKNOWN")))
-    }
-    val blockTable = listingTable(blockHeaders, blockRow, blocks)
-
-    val content =
-      <div class="row-fluid">
-        <div class="span12">
-          <ul class="unstyled">
-            <li>
-              <strong>Storage Level:</strong>
-              {rddInfo.storageLevel.description}
-            </li>
-            <li>
-              <strong>Cached Partitions:</strong>
-              {rddInfo.numCachedPartitions}
-            </li>
-            <li>
-              <strong>Total Partitions:</strong>
-              {rddInfo.numPartitions}
-            </li>
-            <li>
-              <strong>Memory Size:</strong>
-              {Utils.bytesToString(rddInfo.memSize)}
-            </li>
-            <li>
-              <strong>Disk Size:</strong>
-              {Utils.bytesToString(rddInfo.diskSize)}
-            </li>
-          </ul>
-        </div>
-      </div>
-
-      <div class="row-fluid">
-        <div class="span12">
-          <h4> Data Distribution on {workers.size} Executors </h4>
-          {workerTable}
-        </div>
-      </div>
-
-      <div class="row-fluid">
-        <div class="span12">
-          <h4> {blocks.size} Partitions </h4>
-          {blockTable}
-        </div>
-      </div>;
-
-    headerSparkPage(content, parent.sc, "RDD Storage Info for " + rddInfo.name, Storage)
-  }
-
-  def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {
-    val (id, block, locations) = row
-    <tr>
-      <td>{id}</td>
-      <td>
-        {block.storageLevel.description}
-      </td>
-      <td sorttable_customkey={block.memSize.toString}>
-        {Utils.bytesToString(block.memSize)}
-      </td>
-      <td sorttable_customkey={block.diskSize.toString}>
-        {Utils.bytesToString(block.diskSize)}
-      </td>
-      <td>
-        {locations.map(l => <span>{l}<br/></span>)}
-      </td>
-    </tr>
-  }
-
-  def workerRow(worker: (String, StorageStatus)): Seq[Node] = {
-    val (prefix, status) = worker
-    <tr>
-      <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
-      <td>
-        {Utils.bytesToString(status.memUsed(prefix))}
-        ({Utils.bytesToString(status.memRemaining)} Remaining)
-      </td>
-      <td>{Utils.bytesToString(status.diskUsed(prefix))}</td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
deleted file mode 100644
index 9233277..0000000
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import com.typesafe.config.ConfigFactory
-import akka.util.duration._
-import akka.remote.RemoteActorRefProvider
-
-
-/**
- * Various utility classes for working with Akka.
- */
-private[spark] object AkkaUtils {
-
-  /**
-   * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
-   * ActorSystem itself and its port (which is hard to get from Akka).
-   *
-   * Note: the `name` parameter is important, as even if a client sends a message to right
-   * host + port, if the system name is incorrect, Akka will drop the message.
-   */
-  def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
-    val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
-    val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
-    val akkaTimeout = System.getProperty("spark.akka.timeout", "60").toInt
-    val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
-    val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
-    // 10 seconds is the default akka timeout, but in a cluster, we need higher by default.
-    val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt
-    
-    val akkaConf = ConfigFactory.parseString("""
-      akka.daemonic = on
-      akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
-      akka.stdout-loglevel = "ERROR"
-      akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-      akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
-      akka.remote.netty.hostname = "%s"
-      akka.remote.netty.port = %d
-      akka.remote.netty.connection-timeout = %ds
-      akka.remote.netty.message-frame-size = %d MiB
-      akka.remote.netty.execution-pool-size = %d
-      akka.actor.default-dispatcher.throughput = %d
-      akka.remote.log-remote-lifecycle-events = %s
-      akka.remote.netty.write-timeout = %ds
-      """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
-        lifecycleEvents, akkaWriteTimeout))
-
-    val actorSystem = ActorSystem(name, akkaConf)
-
-    // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
-    // hack because Akka doesn't let you figure out the port through the public API yet.
-    val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
-    val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
-    return (actorSystem, boundPort)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/BoundedPriorityQueue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/spark/util/BoundedPriorityQueue.scala
deleted file mode 100644
index 0575497..0000000
--- a/core/src/main/scala/spark/util/BoundedPriorityQueue.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.io.Serializable
-import java.util.{PriorityQueue => JPriorityQueue}
-import scala.collection.generic.Growable
-import scala.collection.JavaConverters._
-
-/**
- * Bounded priority queue. This class wraps the original PriorityQueue
- * class and modifies it such that only the top K elements are retained.
- * The top K elements are defined by an implicit Ordering[A].
- */
-class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
-  extends Iterable[A] with Growable[A] with Serializable {
-
-  private val underlying = new JPriorityQueue[A](maxSize, ord)
-
-  override def iterator: Iterator[A] = underlying.iterator.asScala
-
-  override def ++=(xs: TraversableOnce[A]): this.type = {
-    xs.foreach { this += _ }
-    this
-  }
-
-  override def +=(elem: A): this.type = {
-    if (size < maxSize) underlying.offer(elem)
-    else maybeReplaceLowest(elem)
-    this
-  }
-
-  override def +=(elem1: A, elem2: A, elems: A*): this.type = {
-    this += elem1 += elem2 ++= elems
-  }
-
-  override def clear() { underlying.clear() }
-
-  private def maybeReplaceLowest(a: A): Boolean = {
-    val head = underlying.peek()
-    if (head != null && ord.gt(a, head)) {
-      underlying.poll()
-      underlying.offer(a)
-    } else false
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/ByteBufferInputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/spark/util/ByteBufferInputStream.scala
deleted file mode 100644
index 47a28e2..0000000
--- a/core/src/main/scala/spark/util/ByteBufferInputStream.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.io.InputStream
-import java.nio.ByteBuffer
-import spark.storage.BlockManager
-
-/**
- * Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose()
- * at the end of the stream (e.g. to close a memory-mapped file).
- */
-private[spark]
-class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = false)
-  extends InputStream {
-
-  override def read(): Int = {
-    if (buffer == null || buffer.remaining() == 0) {
-      cleanUp()
-      -1
-    } else {
-      buffer.get() & 0xFF
-    }
-  }
-
-  override def read(dest: Array[Byte]): Int = {
-    read(dest, 0, dest.length)
-  }
-
-  override def read(dest: Array[Byte], offset: Int, length: Int): Int = {
-    if (buffer == null || buffer.remaining() == 0) {
-      cleanUp()
-      -1
-    } else {
-      val amountToGet = math.min(buffer.remaining(), length)
-      buffer.get(dest, offset, amountToGet)
-      amountToGet
-    }
-  }
-
-  override def skip(bytes: Long): Long = {
-    if (buffer != null) {
-      val amountToSkip = math.min(bytes, buffer.remaining).toInt
-      buffer.position(buffer.position + amountToSkip)
-      if (buffer.remaining() == 0) {
-        cleanUp()
-      }
-      amountToSkip
-    } else {
-      0L
-    }
-  }
-
-  /**
-   * Clean up the buffer, and potentially dispose of it using BlockManager.dispose().
-   */
-  private def cleanUp() {
-    if (buffer != null) {
-      if (dispose) {
-        BlockManager.dispose(buffer)
-      }
-      buffer = null
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/Clock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/Clock.scala b/core/src/main/scala/spark/util/Clock.scala
deleted file mode 100644
index aa71a5b..0000000
--- a/core/src/main/scala/spark/util/Clock.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-/**
- * An interface to represent clocks, so that they can be mocked out in unit tests.
- */
-private[spark] trait Clock {
-  def getTime(): Long
-}
-
-private[spark] object SystemClock extends Clock {
-  def getTime(): Long = System.currentTimeMillis()
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/CompletionIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/CompletionIterator.scala b/core/src/main/scala/spark/util/CompletionIterator.scala
deleted file mode 100644
index 2104508..0000000
--- a/core/src/main/scala/spark/util/CompletionIterator.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-/**
- * Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements
- */
-abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
-  def next = sub.next
-  def hasNext = {
-    val r = sub.hasNext
-    if (!r) {
-      completion
-    }
-    r
-  }
-
-  def completion()
-}
-
-object CompletionIterator {
-  def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = {
-    new CompletionIterator[A,I](sub) {
-      def completion() = completionFunction
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/Distribution.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/Distribution.scala b/core/src/main/scala/spark/util/Distribution.scala
deleted file mode 100644
index 5d4d7a6..0000000
--- a/core/src/main/scala/spark/util/Distribution.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.io.PrintStream
-
-/**
- * Util for getting some stats from a small sample of numeric values, with some handy summary functions.
- *
- * Entirely in memory, not intended as a good way to compute stats over large data sets.
- *
- * Assumes you are giving it a non-empty set of data
- */
-class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int) {
-  require(startIdx < endIdx)
-  def this(data: Traversable[Double]) = this(data.toArray, 0, data.size)
-  java.util.Arrays.sort(data, startIdx, endIdx)
-  val length = endIdx - startIdx
-
-  val defaultProbabilities = Array(0,0.25,0.5,0.75,1.0)
-
-  /**
-   * Get the value of the distribution at the given probabilities.  Probabilities should be
-   * given from 0 to 1
-   * @param probabilities
-   */
-  def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities) = {
-    probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))}
-  }
-
-  private def closestIndex(p: Double) = {
-    math.min((p * length).toInt + startIdx, endIdx - 1)
-  }
-
-  def showQuantiles(out: PrintStream = System.out) = {
-    out.println("min\t25%\t50%\t75%\tmax")
-    getQuantiles(defaultProbabilities).foreach{q => out.print(q + "\t")}
-    out.println
-  }
-
-  def statCounter = StatCounter(data.slice(startIdx, endIdx))
-
-  /**
-   * print a summary of this distribution to the given PrintStream.
-   * @param out
-   */
-  def summary(out: PrintStream = System.out) {
-    out.println(statCounter)
-    showQuantiles(out)
-  }
-}
-
-object Distribution {
-
-  def apply(data: Traversable[Double]): Option[Distribution] = {
-    if (data.size > 0)
-      Some(new Distribution(data))
-    else
-      None
-  }
-
-  def showQuantiles(out: PrintStream = System.out, quantiles: Traversable[Double]) {
-    out.println("min\t25%\t50%\t75%\tmax")
-    quantiles.foreach{q => out.print(q + "\t")}
-    out.println
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/IdGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/IdGenerator.scala b/core/src/main/scala/spark/util/IdGenerator.scala
deleted file mode 100644
index 3422280..0000000
--- a/core/src/main/scala/spark/util/IdGenerator.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.util.concurrent.atomic.AtomicInteger
-
-/**
- * A util used to get a unique generation ID. This is a wrapper around Java's
- * AtomicInteger. An example usage is in BlockManager, where each BlockManager
- * instance would start an Akka actor and we use this utility to assign the Akka
- * actors unique names.
- */
-private[spark] class IdGenerator {
-  private var id = new AtomicInteger
-  def next: Int = id.incrementAndGet
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/IntParam.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/IntParam.scala b/core/src/main/scala/spark/util/IntParam.scala
deleted file mode 100644
index daf0d58..0000000
--- a/core/src/main/scala/spark/util/IntParam.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-/**
- * An extractor object for parsing strings into integers.
- */
-private[spark] object IntParam {
-  def unapply(str: String): Option[Int] = {
-    try {
-      Some(str.toInt)
-    } catch {
-      case e: NumberFormatException => None
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/MemoryParam.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/MemoryParam.scala b/core/src/main/scala/spark/util/MemoryParam.scala
deleted file mode 100644
index 2985623..0000000
--- a/core/src/main/scala/spark/util/MemoryParam.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import spark.Utils
-
-/**
- * An extractor object for parsing JVM memory strings, such as "10g", into an Int representing
- * the number of megabytes. Supports the same formats as Utils.memoryStringToMb.
- */
-private[spark] object MemoryParam {
-  def unapply(str: String): Option[Int] = {
-    try {
-      Some(Utils.memoryStringToMb(str))
-    } catch {
-      case e: NumberFormatException => None
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
deleted file mode 100644
index 92909e0..0000000
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
-import java.util.{TimerTask, Timer}
-import spark.Logging
-
-
-/**
- * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
- */
-class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
-  private val delaySeconds = MetadataCleaner.getDelaySeconds
-  private val periodSeconds = math.max(10, delaySeconds / 10)
-  private val timer = new Timer(name + " cleanup timer", true)
-
-  private val task = new TimerTask {
-    override def run() {
-      try {
-        cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
-        logInfo("Ran metadata cleaner for " + name)
-      } catch {
-        case e: Exception => logError("Error running cleanup task for " + name, e)
-      }
-    }
-  }
-
-  if (delaySeconds > 0) {
-    logDebug(
-      "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
-      "and period of " + periodSeconds + " secs")
-    timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
-  }
-
-  def cancel() {
-    timer.cancel()
-  }
-}
-
-
-object MetadataCleaner {
-  def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
-  def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString) }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/MutablePair.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/MutablePair.scala b/core/src/main/scala/spark/util/MutablePair.scala
deleted file mode 100644
index 78d404e..0000000
--- a/core/src/main/scala/spark/util/MutablePair.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-
-/**
- * A tuple of 2 elements. This can be used as an alternative to Scala's Tuple2 when we want to
- * minimize object allocation.
- *
- * @param  _1   Element 1 of this MutablePair
- * @param  _2   Element 2 of this MutablePair
- */
-case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
-                      @specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
-  (var _1: T1, var _2: T2)
-  extends Product2[T1, T2]
-{
-  override def toString = "(" + _1 + "," + _2 + ")"
-
-  override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[_,_]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/NextIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/NextIterator.scala b/core/src/main/scala/spark/util/NextIterator.scala
deleted file mode 100644
index 22163ec..0000000
--- a/core/src/main/scala/spark/util/NextIterator.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-/** Provides a basic/boilerplate Iterator implementation. */
-private[spark] abstract class NextIterator[U] extends Iterator[U] {
-  
-  private var gotNext = false
-  private var nextValue: U = _
-  private var closed = false
-  protected var finished = false
-
-  /**
-   * Method for subclasses to implement to provide the next element.
-   *
-   * If no next element is available, the subclass should set `finished`
-   * to `true` and may return any value (it will be ignored).
-   *
-   * This convention is required because `null` may be a valid value,
-   * and using `Option` seems like it might create unnecessary Some/None
-   * instances, given some iterators might be called in a tight loop.
-   * 
-   * @return U, or set 'finished' when done
-   */
-  protected def getNext(): U
-
-  /**
-   * Method for subclasses to implement when all elements have been successfully
-   * iterated, and the iteration is done.
-   *
-   * <b>Note:</b> `NextIterator` cannot guarantee that `close` will be
-   * called because it has no control over what happens when an exception
-   * happens in the user code that is calling hasNext/next.
-   *
-   * Ideally you should have another try/catch, as in HadoopRDD, that
-   * ensures any resources are closed should iteration fail.
-   */
-  protected def close()
-
-  /**
-   * Calls the subclass-defined close method, but only once.
-   *
-   * Usually calling `close` multiple times should be fine, but historically
-   * there have been issues with some InputFormats throwing exceptions.
-   */
-  def closeIfNeeded() {
-    if (!closed) {
-      close()
-      closed = true
-    }
-  }
-
-  override def hasNext: Boolean = {
-    if (!finished) {
-      if (!gotNext) {
-        nextValue = getNext()
-        if (finished) {
-          closeIfNeeded()
-        }
-        gotNext = true
-      }
-    }
-    !finished
-  }
-
-  override def next(): U = {
-    if (!hasNext) {
-      throw new NoSuchElementException("End of stream")
-    }
-    gotNext = false
-    nextValue
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala b/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
deleted file mode 100644
index 00f782b..0000000
--- a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import scala.annotation.tailrec
-
-import java.io.OutputStream
-import java.util.concurrent.TimeUnit._
-
-class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
-  val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
-  val CHUNK_SIZE = 8192
-  var lastSyncTime = System.nanoTime
-  var bytesWrittenSinceSync: Long = 0
-
-  override def write(b: Int) {
-    waitToWrite(1)
-    out.write(b)
-  }
-
-  override def write(bytes: Array[Byte]) {
-    write(bytes, 0, bytes.length)
-  }
-
-  @tailrec
-  override final def write(bytes: Array[Byte], offset: Int, length: Int) {
-    val writeSize = math.min(length - offset, CHUNK_SIZE)
-    if (writeSize > 0) {
-      waitToWrite(writeSize)
-      out.write(bytes, offset, writeSize)
-      write(bytes, offset + writeSize, length)
-    }
-  }
-
-  override def flush() {
-    out.flush()
-  }
-
-  override def close() {
-    out.close()
-  }
-
-  @tailrec
-  private def waitToWrite(numBytes: Int) {
-    val now = System.nanoTime
-    val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS)
-    val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
-    if (rate < bytesPerSec) {
-      // It's okay to write; just update some variables and return
-      bytesWrittenSinceSync += numBytes
-      if (now > lastSyncTime + SYNC_INTERVAL) {
-        // Sync interval has passed; let's resync
-        lastSyncTime = now
-        bytesWrittenSinceSync = numBytes
-      }
-    } else {
-      // Calculate how much time we should sleep to bring ourselves to the desired rate.
-      // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
-      val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
-      if (sleepTime > 0) Thread.sleep(sleepTime)
-      waitToWrite(numBytes)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/SerializableBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/SerializableBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala
deleted file mode 100644
index 7e68426..0000000
--- a/core/src/main/scala/spark/util/SerializableBuffer.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.nio.ByteBuffer
-import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream}
-import java.nio.channels.Channels
-
-/**
- * A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make
- * it easier to pass ByteBuffers in case class messages.
- */
-private[spark]
-class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
-  def value = buffer
-
-  private def readObject(in: ObjectInputStream) {
-    val length = in.readInt()
-    buffer = ByteBuffer.allocate(length)
-    var amountRead = 0
-    val channel = Channels.newChannel(in)
-    while (amountRead < length) {
-      val ret = channel.read(buffer)
-      if (ret == -1) {
-        throw new EOFException("End of file before fully reading buffer")
-      }
-      amountRead += ret
-    }
-    buffer.rewind() // Allow us to read it later
-  }
-
-  private def writeObject(out: ObjectOutputStream) {
-    out.writeInt(buffer.limit())
-    if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
-      throw new IOException("Could not fully write buffer to output stream")
-    }
-    buffer.rewind() // Allow us to write it again later
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/StatCounter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala
deleted file mode 100644
index 76358d4..0000000
--- a/core/src/main/scala/spark/util/StatCounter.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-/**
- * A class for tracking the statistics of a set of numbers (count, mean and variance) in a
- * numerically robust way. Includes support for merging two StatCounters. Based on 
- * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]].
- *
- * @constructor Initialize the StatCounter with the given values.
- */
-class StatCounter(values: TraversableOnce[Double]) extends Serializable {
-  private var n: Long = 0     // Running count of our values
-  private var mu: Double = 0  // Running mean of our values
-  private var m2: Double = 0  // Running variance numerator (sum of (x - mean)^2)
-
-  merge(values)
-
-  /** Initialize the StatCounter with no values. */
-  def this() = this(Nil)
-
-  /** Add a value into this StatCounter, updating the internal statistics. */
-  def merge(value: Double): StatCounter = {
-    val delta = value - mu
-    n += 1
-    mu += delta / n
-    m2 += delta * (value - mu)
-    this
-  }
-
-  /** Add multiple values into this StatCounter, updating the internal statistics. */
-  def merge(values: TraversableOnce[Double]): StatCounter = {
-    values.foreach(v => merge(v))
-    this
-  }
-
-  /** Merge another StatCounter into this one, adding up the internal statistics. */
-  def merge(other: StatCounter): StatCounter = {
-    if (other == this) {
-      merge(other.copy())  // Avoid overwriting fields in a weird order
-    } else {
-      if (n == 0) {
-        mu = other.mu
-        m2 = other.m2
-        n = other.n       
-      } else if (other.n != 0) {        
-        val delta = other.mu - mu
-        if (other.n * 10 < n) {
-          mu = mu + (delta * other.n) / (n + other.n)
-        } else if (n * 10 < other.n) {
-          mu = other.mu - (delta * n) / (n + other.n)
-        } else {
-          mu = (mu * n + other.mu * other.n) / (n + other.n)
-        }
-        m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
-        n += other.n
-      }
-      this	   
-    }
-  }
-
-  /** Clone this StatCounter */
-  def copy(): StatCounter = {
-    val other = new StatCounter
-    other.n = n
-    other.mu = mu
-    other.m2 = m2
-    other
-  }
-
-  def count: Long = n
-
-  def mean: Double = mu
-
-  def sum: Double = n * mu
-
-  /** Return the variance of the values. */
-  def variance: Double = {
-    if (n == 0)
-      Double.NaN
-    else
-      m2 / n
-  }
-
-  /**
-   * Return the sample variance, which corrects for bias in estimating the variance by dividing
-   * by N-1 instead of N.
-   */
-  def sampleVariance: Double = {
-    if (n <= 1)
-      Double.NaN
-    else
-      m2 / (n - 1)
-  }
-
-  /** Return the standard deviation of the values. */
-  def stdev: Double = math.sqrt(variance)
-
-  /**
-   * Return the sample standard deviation of the values, which corrects for bias in estimating the
-   * variance by dividing by N-1 instead of N.
-   */
-  def sampleStdev: Double = math.sqrt(sampleVariance)
-
-  override def toString: String = {
-    "(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev)
-  }
-}
-
-object StatCounter {
-  /** Build a StatCounter from a list of values. */
-  def apply(values: TraversableOnce[Double]) = new StatCounter(values)
-
-  /** Build a StatCounter from a list of values passed as variable-length arguments. */
-  def apply(values: Double*) = new StatCounter(values)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/TimeStampedHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
deleted file mode 100644
index 07772a0..0000000
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConversions
-import scala.collection.mutable.Map
-import scala.collection.immutable
-import spark.scheduler.MapStatus
-
-/**
- * This is a custom implementation of scala.collection.mutable.Map which stores the insertion
- * time stamp along with each key-value pair. Key-value pairs that are older than a particular
- * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in
- * replacement of scala.collection.mutable.HashMap.
- */
-class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
-  val internalMap = new ConcurrentHashMap[A, (B, Long)]()
-
-  def get(key: A): Option[B] = {
-    val value = internalMap.get(key)
-    if (value != null) Some(value._1) else None
-  }
-
-  def iterator: Iterator[(A, B)] = {
-    val jIterator = internalMap.entrySet().iterator()
-    JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue._1))
-  }
-
-  override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
-    val newMap = new TimeStampedHashMap[A, B1]
-    newMap.internalMap.putAll(this.internalMap)
-    newMap.internalMap.put(kv._1, (kv._2, currentTime))
-    newMap
-  }
-
-  override def - (key: A): Map[A, B] = {
-    val newMap = new TimeStampedHashMap[A, B]
-    newMap.internalMap.putAll(this.internalMap)
-    newMap.internalMap.remove(key)
-    newMap
-  }
-
-  override def += (kv: (A, B)): this.type = {
-    internalMap.put(kv._1, (kv._2, currentTime))
-    this
-  }
-
-  // Should we return previous value directly or as Option ?
-  def putIfAbsent(key: A, value: B): Option[B] = {
-    val prev = internalMap.putIfAbsent(key, (value, currentTime))
-    if (prev != null) Some(prev._1) else None
-  }
-
-
-  override def -= (key: A): this.type = {
-    internalMap.remove(key)
-    this
-  }
-
-  override def update(key: A, value: B) {
-    this += ((key, value))
-  }
-
-  override def apply(key: A): B = {
-    val value = internalMap.get(key)
-    if (value == null) throw new NoSuchElementException()
-    value._1
-  }
-
-  override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
-    JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
-  }
-
-  override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
-
-  override def size: Int = internalMap.size
-
-  override def foreach[U](f: ((A, B)) => U) {
-    val iterator = internalMap.entrySet().iterator()
-    while(iterator.hasNext) {
-      val entry = iterator.next()
-      val kv = (entry.getKey, entry.getValue._1)
-      f(kv)
-    }
-  }
-
-  def toMap: immutable.Map[A, B] = iterator.toMap
-
-  /**
-   * Removes old key-value pairs that have timestamp earlier than `threshTime`
-   */
-  def clearOldValues(threshTime: Long) {
-    val iterator = internalMap.entrySet().iterator()
-    while(iterator.hasNext) {
-      val entry = iterator.next()
-      if (entry.getValue._2 < threshTime) {
-        logDebug("Removing key " + entry.getKey)
-        iterator.remove()
-      }
-    }
-  }
-
-  private def currentTime: Long = System.currentTimeMillis()
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/TimeStampedHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/spark/util/TimeStampedHashSet.scala
deleted file mode 100644
index 41e3fd8..0000000
--- a/core/src/main/scala/spark/util/TimeStampedHashSet.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import scala.collection.mutable.Set
-import scala.collection.JavaConversions
-import java.util.concurrent.ConcurrentHashMap
-
-
-class TimeStampedHashSet[A] extends Set[A] {
-  val internalMap = new ConcurrentHashMap[A, Long]()
-
-  def contains(key: A): Boolean = {
-    internalMap.contains(key)
-  }
-
-  def iterator: Iterator[A] = {
-    val jIterator = internalMap.entrySet().iterator()
-    JavaConversions.asScalaIterator(jIterator).map(_.getKey)
-  }
-
-  override def + (elem: A): Set[A] = {
-    val newSet = new TimeStampedHashSet[A]
-    newSet ++= this
-    newSet += elem
-    newSet
-  }
-
-  override def - (elem: A): Set[A] = {
-    val newSet = new TimeStampedHashSet[A]
-    newSet ++= this
-    newSet -= elem
-    newSet
-  }
-
-  override def += (key: A): this.type = {
-    internalMap.put(key, currentTime)
-    this
-  }
-
-  override def -= (key: A): this.type = {
-    internalMap.remove(key)
-    this
-  }
-
-  override def empty: Set[A] = new TimeStampedHashSet[A]()
-
-  override def size(): Int = internalMap.size()
-
-  override def foreach[U](f: (A) => U): Unit = {
-    val iterator = internalMap.entrySet().iterator()
-    while(iterator.hasNext) {
-      f(iterator.next.getKey)
-    }
-  }
-
-  /**
-   * Removes old values that have timestamp earlier than `threshTime`
-   */
-  def clearOldValues(threshTime: Long) {
-    val iterator = internalMap.entrySet().iterator()
-    while(iterator.hasNext) {
-      val entry = iterator.next()
-      if (entry.getValue < threshTime) {
-        iterator.remove()
-      }
-    }
-  }
-
-  private def currentTime: Long = System.currentTimeMillis()
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/Vector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala
deleted file mode 100644
index a47cac3..0000000
--- a/core/src/main/scala/spark/util/Vector.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-class Vector(val elements: Array[Double]) extends Serializable {
-  def length = elements.length
-
-  def apply(index: Int) = elements(index)
-
-  def + (other: Vector): Vector = {
-    if (length != other.length)
-      throw new IllegalArgumentException("Vectors of different length")
-    return Vector(length, i => this(i) + other(i))
-  }
-
-  def add(other: Vector) = this + other
-
-  def - (other: Vector): Vector = {
-    if (length != other.length)
-      throw new IllegalArgumentException("Vectors of different length")
-    return Vector(length, i => this(i) - other(i))
-  }
-
-  def subtract(other: Vector) = this - other
-
-  def dot(other: Vector): Double = {
-    if (length != other.length)
-      throw new IllegalArgumentException("Vectors of different length")
-    var ans = 0.0
-    var i = 0
-    while (i < length) {
-      ans += this(i) * other(i)
-      i += 1
-    }
-    return ans
-  }
-
-  /**
-   * return (this + plus) dot other, but without creating any intermediate storage
-   * @param plus
-   * @param other
-   * @return
-   */
-  def plusDot(plus: Vector, other: Vector): Double = {
-    if (length != other.length)
-      throw new IllegalArgumentException("Vectors of different length")
-    if (length != plus.length)
-      throw new IllegalArgumentException("Vectors of different length")
-    var ans = 0.0
-    var i = 0
-    while (i < length) {
-      ans += (this(i) + plus(i)) * other(i)
-      i += 1
-    }
-    return ans
-  }
-
-  def += (other: Vector): Vector = {
-    if (length != other.length)
-      throw new IllegalArgumentException("Vectors of different length")
-    var i = 0
-    while (i < length) {
-      elements(i) += other(i)
-      i += 1
-    }
-    this
-  }
-
-  def addInPlace(other: Vector) = this +=other
-
-  def * (scale: Double): Vector = Vector(length, i => this(i) * scale)
-
-  def multiply (d: Double) = this * d
-
-  def / (d: Double): Vector = this * (1 / d)
-
-  def divide (d: Double) = this / d
-
-  def unary_- = this * -1
-
-  def sum = elements.reduceLeft(_ + _)
-
-  def squaredDist(other: Vector): Double = {
-    var ans = 0.0
-    var i = 0
-    while (i < length) {
-      ans += (this(i) - other(i)) * (this(i) - other(i))
-      i += 1
-    }
-    return ans
-  }
-
-  def dist(other: Vector): Double = math.sqrt(squaredDist(other))
-
-  override def toString = elements.mkString("(", ", ", ")")
-}
-
-object Vector {
-  def apply(elements: Array[Double]) = new Vector(elements)
-
-  def apply(elements: Double*) = new Vector(elements.toArray)
-
-  def apply(length: Int, initializer: Int => Double): Vector = {
-    val elements: Array[Double] = Array.tabulate(length)(initializer)
-    return new Vector(elements)
-  }
-
-  def zeros(length: Int) = new Vector(new Array[Double](length))
-
-  def ones(length: Int) = Vector(length, _ => 1)
-
-  class Multiplier(num: Double) {
-    def * (vec: Vector) = vec * num
-  }
-
-  implicit def doubleToMultiplier(num: Double) = new Multiplier(num)
-
-  implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] {
-    def addInPlace(t1: Vector, t2: Vector) = t1 + t2
-
-    def zero(initialValue: Vector) = Vector.zeros(initialValue.length)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/resources/test_metrics_config.properties
----------------------------------------------------------------------
diff --git a/core/src/test/resources/test_metrics_config.properties b/core/src/test/resources/test_metrics_config.properties
index 2b31ddf..056a158 100644
--- a/core/src/test/resources/test_metrics_config.properties
+++ b/core/src/test/resources/test_metrics_config.properties
@@ -1,6 +1,6 @@
 *.sink.console.period = 10
 *.sink.console.unit = seconds
-*.source.jvm.class = spark.metrics.source.JvmSource
+*.source.jvm.class = org.apache.spark.metrics.source.JvmSource
 master.sink.console.period = 20
 master.sink.console.unit = minutes
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/resources/test_metrics_system.properties
----------------------------------------------------------------------
diff --git a/core/src/test/resources/test_metrics_system.properties b/core/src/test/resources/test_metrics_system.properties
index d5479f0..6f5ecea 100644
--- a/core/src/test/resources/test_metrics_system.properties
+++ b/core/src/test/resources/test_metrics_system.properties
@@ -1,7 +1,7 @@
 *.sink.console.period = 10
 *.sink.console.unit = seconds
-test.sink.console.class = spark.metrics.sink.ConsoleSink
-test.sink.dummy.class = spark.metrics.sink.DummySink
-test.source.dummy.class = spark.metrics.source.DummySource
+test.sink.console.class = org.apache.spark.metrics.sink.ConsoleSink
+test.sink.dummy.class = org.apache.spark.metrics.sink.DummySink
+test.source.dummy.class = org.apache.spark.metrics.source.DummySource
 test.sink.console.period = 20
 test.sink.console.unit = minutes