You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:57 UTC
[13/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
deleted file mode 100644
index 1d9767a..0000000
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-package spark.ui.jobs
-
-import scala.Seq
-import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
-
-import spark.{ExceptionFailure, SparkContext, Success, Utils}
-import spark.scheduler._
-import spark.scheduler.cluster.TaskInfo
-import spark.executor.TaskMetrics
-import collection.mutable
-
-/**
- * Tracks task-level information to be displayed in the UI.
- *
- * All access to the data structures in this class must be synchronized on the
- * class, since the UI thread and the DAGScheduler event loop may otherwise
- * be reading/updating the internal data structures concurrently.
- */
-private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
- // How many stages to remember
- val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
- val DEFAULT_POOL_NAME = "default"
-
- val stageToPool = new HashMap[Stage, String]()
- val stageToDescription = new HashMap[Stage, String]()
- val poolToActiveStages = new HashMap[String, HashSet[Stage]]()
-
- val activeStages = HashSet[Stage]()
- val completedStages = ListBuffer[Stage]()
- val failedStages = ListBuffer[Stage]()
-
- // Total metrics reflect metrics only for completed tasks
- var totalTime = 0L
- var totalShuffleRead = 0L
- var totalShuffleWrite = 0L
-
- val stageToTime = HashMap[Int, Long]()
- val stageToShuffleRead = HashMap[Int, Long]()
- val stageToShuffleWrite = HashMap[Int, Long]()
- val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
- val stageToTasksComplete = HashMap[Int, Int]()
- val stageToTasksFailed = HashMap[Int, Int]()
- val stageToTaskInfos =
- HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
-
- override def onJobStart(jobStart: SparkListenerJobStart) {}
-
- override def onStageCompleted(stageCompleted: StageCompleted) = synchronized {
- val stage = stageCompleted.stageInfo.stage
- poolToActiveStages(stageToPool(stage)) -= stage
- activeStages -= stage
- completedStages += stage
- trimIfNecessary(completedStages)
- }
-
- /** If stages is too large, remove and garbage collect old stages */
- def trimIfNecessary(stages: ListBuffer[Stage]) = synchronized {
- if (stages.size > RETAINED_STAGES) {
- val toRemove = RETAINED_STAGES / 10
- stages.takeRight(toRemove).foreach( s => {
- stageToTaskInfos.remove(s.id)
- stageToTime.remove(s.id)
- stageToShuffleRead.remove(s.id)
- stageToShuffleWrite.remove(s.id)
- stageToTasksActive.remove(s.id)
- stageToTasksComplete.remove(s.id)
- stageToTasksFailed.remove(s.id)
- stageToPool.remove(s)
- if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
- })
- stages.trimEnd(toRemove)
- }
- }
-
- /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
- val stage = stageSubmitted.stage
- activeStages += stage
-
- val poolName = Option(stageSubmitted.properties).map {
- p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
- }.getOrElse(DEFAULT_POOL_NAME)
- stageToPool(stage) = poolName
-
- val description = Option(stageSubmitted.properties).flatMap {
- p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
- }
- description.map(d => stageToDescription(stage) = d)
-
- val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
- stages += stage
- }
-
- override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
- val sid = taskStart.task.stageId
- val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
- tasksActive += taskStart.taskInfo
- val taskList = stageToTaskInfos.getOrElse(
- sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList += ((taskStart.taskInfo, None, None))
- stageToTaskInfos(sid) = taskList
- }
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
- val sid = taskEnd.task.stageId
- val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
- tasksActive -= taskEnd.taskInfo
- val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
- taskEnd.reason match {
- case e: ExceptionFailure =>
- stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
- (Some(e), e.metrics)
- case _ =>
- stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- (None, Option(taskEnd.taskMetrics))
- }
-
- stageToTime.getOrElseUpdate(sid, 0L)
- val time = metrics.map(m => m.executorRunTime).getOrElse(0)
- stageToTime(sid) += time
- totalTime += time
-
- stageToShuffleRead.getOrElseUpdate(sid, 0L)
- val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
- s.remoteBytesRead).getOrElse(0L)
- stageToShuffleRead(sid) += shuffleRead
- totalShuffleRead += shuffleRead
-
- stageToShuffleWrite.getOrElseUpdate(sid, 0L)
- val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
- s.shuffleBytesWritten).getOrElse(0L)
- stageToShuffleWrite(sid) += shuffleWrite
- totalShuffleWrite += shuffleWrite
-
- val taskList = stageToTaskInfos.getOrElse(
- sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList -= ((taskEnd.taskInfo, None, None))
- taskList += ((taskEnd.taskInfo, metrics, failureInfo))
- stageToTaskInfos(sid) = taskList
- }
-
- override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
- jobEnd match {
- case end: SparkListenerJobEnd =>
- end.jobResult match {
- case JobFailed(ex, Some(stage)) =>
- activeStages -= stage
- poolToActiveStages(stageToPool(stage)) -= stage
- failedStages += stage
- trimIfNecessary(failedStages)
- case _ =>
- }
- case _ =>
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
deleted file mode 100644
index c83f102..0000000
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.ui.jobs
-
-import akka.util.Duration
-
-import java.text.SimpleDateFormat
-
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.server.Handler
-
-import scala.Seq
-import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
-
-import spark.ui.JettyUtils._
-import spark.{ExceptionFailure, SparkContext, Success, Utils}
-import spark.scheduler._
-import collection.mutable
-import spark.scheduler.cluster.SchedulingMode
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
-
-/** Web UI showing progress status of all jobs in the given SparkContext. */
-private[spark] class JobProgressUI(val sc: SparkContext) {
- private var _listener: Option[JobProgressListener] = None
- def listener = _listener.get
- val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-
- private val indexPage = new IndexPage(this)
- private val stagePage = new StagePage(this)
- private val poolPage = new PoolPage(this)
-
- def start() {
- _listener = Some(new JobProgressListener(sc))
- sc.addSparkListener(listener)
- }
-
- def formatDuration(ms: Long) = Utils.msDurationToString(ms)
-
- def getHandlers = Seq[(String, Handler)](
- ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
- ("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)),
- ("/stages", (request: HttpServletRequest) => indexPage.render(request))
- )
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
deleted file mode 100644
index 7fb74dc..0000000
--- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package spark.ui.jobs
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.{NodeSeq, Node}
-import scala.collection.mutable.HashSet
-
-import spark.scheduler.Stage
-import spark.ui.UIUtils._
-import spark.ui.Page._
-
-/** Page showing specific pool details */
-private[spark] class PoolPage(parent: JobProgressUI) {
- def listener = parent.listener
-
- def render(request: HttpServletRequest): Seq[Node] = {
- listener.synchronized {
- val poolName = request.getParameter("poolname")
- val poolToActiveStages = listener.poolToActiveStages
- val activeStages = poolToActiveStages.get(poolName).toSeq.flatten
- val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
-
- val pool = listener.sc.getPoolForName(poolName).get
- val poolTable = new PoolTable(Seq(pool), listener)
-
- val content = <h4>Summary </h4> ++ poolTable.toNodeSeq() ++
- <h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq()
-
- headerSparkPage(content, parent.sc, "Fair Scheduler Pool: " + poolName, Stages)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
deleted file mode 100644
index 621828f..0000000
--- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-package spark.ui.jobs
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.xml.Node
-
-import spark.scheduler.Stage
-import spark.scheduler.cluster.Schedulable
-
-/** Table showing list of pools */
-private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
-
- var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
-
- def toNodeSeq(): Seq[Node] = {
- listener.synchronized {
- poolTable(poolRow, pools)
- }
- }
-
- private def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
- rows: Seq[Schedulable]
- ): Seq[Node] = {
- <table class="table table-bordered table-striped table-condensed sortable table-fixed">
- <thead>
- <th>Pool Name</th>
- <th>Minimum Share</th>
- <th>Pool Weight</th>
- <th>Active Stages</th>
- <th>Running Tasks</th>
- <th>SchedulingMode</th>
- </thead>
- <tbody>
- {rows.map(r => makeRow(r, poolToActiveStages))}
- </tbody>
- </table>
- }
-
- private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]])
- : Seq[Node] = {
- val activeStages = poolToActiveStages.get(p.name) match {
- case Some(stages) => stages.size
- case None => 0
- }
- <tr>
- <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
- <td>{p.minShare}</td>
- <td>{p.weight}</td>
- <td>{activeStages}</td>
- <td>{p.runningTasks}</td>
- <td>{p.schedulingMode}</td>
- </tr>
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
deleted file mode 100644
index c234147..0000000
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.ui.jobs
-
-import java.util.Date
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import spark.ui.UIUtils._
-import spark.ui.Page._
-import spark.util.Distribution
-import spark.{ExceptionFailure, Utils}
-import spark.scheduler.cluster.TaskInfo
-import spark.executor.TaskMetrics
-
-/** Page showing statistics and task list for a given stage */
-private[spark] class StagePage(parent: JobProgressUI) {
- def listener = parent.listener
- val dateFmt = parent.dateFmt
-
- def render(request: HttpServletRequest): Seq[Node] = {
- listener.synchronized {
- val stageId = request.getParameter("id").toInt
- val now = System.currentTimeMillis()
-
- if (!listener.stageToTaskInfos.contains(stageId)) {
- val content =
- <div>
- <h4>Summary Metrics</h4> No tasks have started yet
- <h4>Tasks</h4> No tasks have started yet
- </div>
- return headerSparkPage(content, parent.sc, "Details for Stage %s".format(stageId), Stages)
- }
-
- val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime)
-
- val numCompleted = tasks.count(_._1.finished)
- val shuffleReadBytes = listener.stageToShuffleRead.getOrElse(stageId, 0L)
- val hasShuffleRead = shuffleReadBytes > 0
- val shuffleWriteBytes = listener.stageToShuffleWrite.getOrElse(stageId, 0L)
- val hasShuffleWrite = shuffleWriteBytes > 0
-
- var activeTime = 0L
- listener.stageToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
-
- val summary =
- <div>
- <ul class="unstyled">
- <li>
- <strong>CPU time: </strong>
- {parent.formatDuration(listener.stageToTime.getOrElse(stageId, 0L) + activeTime)}
- </li>
- {if (hasShuffleRead)
- <li>
- <strong>Shuffle read: </strong>
- {Utils.bytesToString(shuffleReadBytes)}
- </li>
- }
- {if (hasShuffleWrite)
- <li>
- <strong>Shuffle write: </strong>
- {Utils.bytesToString(shuffleWriteBytes)}
- </li>
- }
- </ul>
- </div>
-
- val taskHeaders: Seq[String] =
- Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++
- Seq("GC Time") ++
- {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
- {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
- Seq("Errors")
-
- val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
-
- // Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
-
- val summaryTable: Option[Seq[Node]] =
- if (validTasks.size == 0) {
- None
- }
- else {
- val serviceTimes = validTasks.map{case (info, metrics, exception) =>
- metrics.get.executorRunTime.toDouble}
- val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
- ms => parent.formatDuration(ms.toLong))
-
- def getQuantileCols(data: Seq[Double]) =
- Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
-
- val shuffleReadSizes = validTasks.map {
- case(info, metrics, exception) =>
- metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
- }
- val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
-
- val shuffleWriteSizes = validTasks.map {
- case(info, metrics, exception) =>
- metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
- }
- val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
-
- val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
- if (hasShuffleRead) shuffleReadQuantiles else Nil,
- if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
-
- val quantileHeaders = Seq("Metric", "Min", "25th percentile",
- "Median", "75th percentile", "Max")
- def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
- Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
- }
-
- val content =
- summary ++
- <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
- <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
- <h4>Tasks</h4> ++ taskTable;
-
- headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
- }
- }
-
-
- def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
- (taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
- def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
- trace.map(e => <span style="display:block;">{e.toString}</span>)
- val (info, metrics, exception) = taskData
-
- val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
- else metrics.map(m => m.executorRunTime).getOrElse(1)
- val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
- else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
- val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
-
- <tr>
- <td>{info.taskId}</td>
- <td>{info.status}</td>
- <td>{info.taskLocality}</td>
- <td>{info.host}</td>
- <td>{dateFmt.format(new Date(info.launchTime))}</td>
- <td sorttable_customkey={duration.toString}>
- {formatDuration}
- </td>
- <td sorttable_customkey={gcTime.toString}>
- {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
- </td>
- {if (shuffleRead) {
- <td>{metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
- Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
- }}
- {if (shuffleWrite) {
- <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
- Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
- }}
- <td>{exception.map(e =>
- <span>
- {e.className} ({e.description})<br/>
- {fmtStackTrace(e.stackTrace)}
- </span>).getOrElse("")}
- </td>
- </tr>
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
deleted file mode 100644
index 2b1bc98..0000000
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-package spark.ui.jobs
-
-import java.util.Date
-
-import scala.xml.Node
-import scala.collection.mutable.HashSet
-
-import spark.Utils
-import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
-import spark.scheduler.Stage
-
-
-/** Page showing list of all ongoing and recently finished stages */
-private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
-
- val listener = parent.listener
- val dateFmt = parent.dateFmt
- val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
-
- def toNodeSeq(): Seq[Node] = {
- listener.synchronized {
- stageTable(stageRow, stages)
- }
- }
-
- /** Special table which merges two header cells. */
- private def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <th>Stage Id</th>
- {if (isFairScheduler) {<th>Pool Name</th>} else {}}
- <th>Description</th>
- <th>Submitted</th>
- <th>Duration</th>
- <th>Tasks: Succeeded/Total</th>
- <th>Shuffle Read</th>
- <th>Shuffle Write</th>
- </thead>
- <tbody>
- {rows.map(r => makeRow(r))}
- </tbody>
- </table>
- }
-
- private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
- val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
- val startWidth = "width: %s%%".format((started.toDouble/total)*100)
-
- <div class="progress">
- <span style="text-align:center; position:absolute; width:100%;">
- {completed}/{total} {failed}
- </span>
- <div class="bar bar-completed" style={completeWidth}></div>
- <div class="bar bar-running" style={startWidth}></div>
- </div>
- }
-
-
- private def stageRow(s: Stage): Seq[Node] = {
- val submissionTime = s.submissionTime match {
- case Some(t) => dateFmt.format(new Date(t))
- case None => "Unknown"
- }
-
- val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
- val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
-
- val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
- val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
- val failedTasks = listener.stageToTasksFailed.getOrElse(s.id, 0) match {
- case f if f > 0 => "(%s failed)".format(f)
- case _ => ""
- }
- val totalTasks = s.numPartitions
-
- val poolName = listener.stageToPool.get(s)
-
- val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
- val description = listener.stageToDescription.get(s)
- .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
- val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
- val duration = s.submissionTime.map(t => finishTime - t)
-
- <tr>
- <td>{s.id}</td>
- {if (isFairScheduler) {
- <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
- }
- <td>{description}</td>
- <td valign="middle">{submissionTime}</td>
- <td sorttable_customkey={duration.getOrElse(-1).toString}>
- {duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")}
- </td>
- <td class="progress-cell">
- {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
- </td>
- <td>{shuffleRead}</td>
- <td>{shuffleWrite}</td>
- </tr>
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
deleted file mode 100644
index 49ed069..0000000
--- a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.ui.storage
-
-import akka.util.Duration
-
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.server.Handler
-
-import spark.{Logging, SparkContext}
-import spark.ui.JettyUtils._
-
-/** Web UI showing storage status of all RDD's in the given SparkContext. */
-private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
- val indexPage = new IndexPage(this)
- val rddPage = new RDDPage(this)
-
- def getHandlers = Seq[(String, Handler)](
- ("/storage/rdd", (request: HttpServletRequest) => rddPage.render(request)),
- ("/storage", (request: HttpServletRequest) => indexPage.render(request))
- )
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/storage/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
deleted file mode 100644
index fc6273c..0000000
--- a/core/src/main/scala/spark/ui/storage/IndexPage.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.ui.storage
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import spark.storage.{RDDInfo, StorageUtils}
-import spark.Utils
-import spark.ui.UIUtils._
-import spark.ui.Page._
-
-/** Page showing list of RDD's currently stored in the cluster */
-private[spark] class IndexPage(parent: BlockManagerUI) {
- val sc = parent.sc
-
- def render(request: HttpServletRequest): Seq[Node] = {
- val storageStatusList = sc.getExecutorStorageStatus
- // Calculate macro-level statistics
-
- val rddHeaders = Seq(
- "RDD Name",
- "Storage Level",
- "Cached Partitions",
- "Fraction Cached",
- "Size in Memory",
- "Size on Disk")
- val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
- val content = listingTable(rddHeaders, rddRow, rdds)
-
- headerSparkPage(content, parent.sc, "Storage ", Storage)
- }
-
- def rddRow(rdd: RDDInfo): Seq[Node] = {
- <tr>
- <td>
- <a href={"/storage/rdd?id=%s".format(rdd.id)}>
- {rdd.name}
- </a>
- </td>
- <td>{rdd.storageLevel.description}
- </td>
- <td>{rdd.numCachedPartitions}</td>
- <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
- <td>{Utils.bytesToString(rdd.memSize)}</td>
- <td>{Utils.bytesToString(rdd.diskSize)}</td>
- </tr>
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
deleted file mode 100644
index b128a56..0000000
--- a/core/src/main/scala/spark/ui/storage/RDDPage.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.ui.storage
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import spark.Utils
-import spark.storage.{StorageStatus, StorageUtils}
-import spark.storage.BlockManagerMasterActor.BlockStatus
-import spark.ui.UIUtils._
-import spark.ui.Page._
-
-
-/** Page showing storage details for a given RDD */
-private[spark] class RDDPage(parent: BlockManagerUI) {
- val sc = parent.sc
-
- def render(request: HttpServletRequest): Seq[Node] = {
- val id = request.getParameter("id")
- val prefix = "rdd_" + id.toString
- val storageStatusList = sc.getExecutorStorageStatus
- val filteredStorageStatusList = StorageUtils.
- filterStorageStatusByPrefix(storageStatusList, prefix)
- val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
-
- val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage")
- val workers = filteredStorageStatusList.map((prefix, _))
- val workerTable = listingTable(workerHeaders, workerRow, workers)
-
- val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk",
- "Executors")
-
- val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
- val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
- val blocks = blockStatuses.map {
- case(id, status) => (id, status, blockLocations.get(id).getOrElse(Seq("UNKNOWN")))
- }
- val blockTable = listingTable(blockHeaders, blockRow, blocks)
-
- val content =
- <div class="row-fluid">
- <div class="span12">
- <ul class="unstyled">
- <li>
- <strong>Storage Level:</strong>
- {rddInfo.storageLevel.description}
- </li>
- <li>
- <strong>Cached Partitions:</strong>
- {rddInfo.numCachedPartitions}
- </li>
- <li>
- <strong>Total Partitions:</strong>
- {rddInfo.numPartitions}
- </li>
- <li>
- <strong>Memory Size:</strong>
- {Utils.bytesToString(rddInfo.memSize)}
- </li>
- <li>
- <strong>Disk Size:</strong>
- {Utils.bytesToString(rddInfo.diskSize)}
- </li>
- </ul>
- </div>
- </div>
-
- <div class="row-fluid">
- <div class="span12">
- <h4> Data Distribution on {workers.size} Executors </h4>
- {workerTable}
- </div>
- </div>
-
- <div class="row-fluid">
- <div class="span12">
- <h4> {blocks.size} Partitions </h4>
- {blockTable}
- </div>
- </div>;
-
- headerSparkPage(content, parent.sc, "RDD Storage Info for " + rddInfo.name, Storage)
- }
-
- def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {
- val (id, block, locations) = row
- <tr>
- <td>{id}</td>
- <td>
- {block.storageLevel.description}
- </td>
- <td sorttable_customkey={block.memSize.toString}>
- {Utils.bytesToString(block.memSize)}
- </td>
- <td sorttable_customkey={block.diskSize.toString}>
- {Utils.bytesToString(block.diskSize)}
- </td>
- <td>
- {locations.map(l => <span>{l}<br/></span>)}
- </td>
- </tr>
- }
-
- def workerRow(worker: (String, StorageStatus)): Seq[Node] = {
- val (prefix, status) = worker
- <tr>
- <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
- <td>
- {Utils.bytesToString(status.memUsed(prefix))}
- ({Utils.bytesToString(status.memRemaining)} Remaining)
- </td>
- <td>{Utils.bytesToString(status.diskUsed(prefix))}</td>
- </tr>
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
deleted file mode 100644
index 9233277..0000000
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import com.typesafe.config.ConfigFactory
-import akka.util.duration._
-import akka.remote.RemoteActorRefProvider
-
-
-/**
- * Various utility classes for working with Akka.
- */
-private[spark] object AkkaUtils {
-
- /**
- * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
- * ActorSystem itself and its port (which is hard to get from Akka).
- *
- * Note: the `name` parameter is important, as even if a client sends a message to right
- * host + port, if the system name is incorrect, Akka will drop the message.
- */
- def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
- val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
- val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
- val akkaTimeout = System.getProperty("spark.akka.timeout", "60").toInt
- val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
- val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
- // 10 seconds is the default akka timeout, but in a cluster, we need higher by default.
- val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt
-
- val akkaConf = ConfigFactory.parseString("""
- akka.daemonic = on
- akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
- akka.stdout-loglevel = "ERROR"
- akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
- akka.remote.netty.hostname = "%s"
- akka.remote.netty.port = %d
- akka.remote.netty.connection-timeout = %ds
- akka.remote.netty.message-frame-size = %d MiB
- akka.remote.netty.execution-pool-size = %d
- akka.actor.default-dispatcher.throughput = %d
- akka.remote.log-remote-lifecycle-events = %s
- akka.remote.netty.write-timeout = %ds
- """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
- lifecycleEvents, akkaWriteTimeout))
-
- val actorSystem = ActorSystem(name, akkaConf)
-
- // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
- // hack because Akka doesn't let you figure out the port through the public API yet.
- val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
- val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
- return (actorSystem, boundPort)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/BoundedPriorityQueue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/spark/util/BoundedPriorityQueue.scala
deleted file mode 100644
index 0575497..0000000
--- a/core/src/main/scala/spark/util/BoundedPriorityQueue.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.io.Serializable
-import java.util.{PriorityQueue => JPriorityQueue}
-import scala.collection.generic.Growable
-import scala.collection.JavaConverters._
-
-/**
- * Bounded priority queue. This class wraps the original PriorityQueue
- * class and modifies it such that only the top K elements are retained.
- * The top K elements are defined by an implicit Ordering[A].
- */
-class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
- extends Iterable[A] with Growable[A] with Serializable {
-
- private val underlying = new JPriorityQueue[A](maxSize, ord)
-
- override def iterator: Iterator[A] = underlying.iterator.asScala
-
- override def ++=(xs: TraversableOnce[A]): this.type = {
- xs.foreach { this += _ }
- this
- }
-
- override def +=(elem: A): this.type = {
- if (size < maxSize) underlying.offer(elem)
- else maybeReplaceLowest(elem)
- this
- }
-
- override def +=(elem1: A, elem2: A, elems: A*): this.type = {
- this += elem1 += elem2 ++= elems
- }
-
- override def clear() { underlying.clear() }
-
- private def maybeReplaceLowest(a: A): Boolean = {
- val head = underlying.peek()
- if (head != null && ord.gt(a, head)) {
- underlying.poll()
- underlying.offer(a)
- } else false
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/ByteBufferInputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/spark/util/ByteBufferInputStream.scala
deleted file mode 100644
index 47a28e2..0000000
--- a/core/src/main/scala/spark/util/ByteBufferInputStream.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.io.InputStream
-import java.nio.ByteBuffer
-import spark.storage.BlockManager
-
-/**
- * Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose()
- * at the end of the stream (e.g. to close a memory-mapped file).
- */
-private[spark]
-class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = false)
- extends InputStream {
-
- override def read(): Int = {
- if (buffer == null || buffer.remaining() == 0) {
- cleanUp()
- -1
- } else {
- buffer.get() & 0xFF
- }
- }
-
- override def read(dest: Array[Byte]): Int = {
- read(dest, 0, dest.length)
- }
-
- override def read(dest: Array[Byte], offset: Int, length: Int): Int = {
- if (buffer == null || buffer.remaining() == 0) {
- cleanUp()
- -1
- } else {
- val amountToGet = math.min(buffer.remaining(), length)
- buffer.get(dest, offset, amountToGet)
- amountToGet
- }
- }
-
- override def skip(bytes: Long): Long = {
- if (buffer != null) {
- val amountToSkip = math.min(bytes, buffer.remaining).toInt
- buffer.position(buffer.position + amountToSkip)
- if (buffer.remaining() == 0) {
- cleanUp()
- }
- amountToSkip
- } else {
- 0L
- }
- }
-
- /**
- * Clean up the buffer, and potentially dispose of it using BlockManager.dispose().
- */
- private def cleanUp() {
- if (buffer != null) {
- if (dispose) {
- BlockManager.dispose(buffer)
- }
- buffer = null
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/Clock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/Clock.scala b/core/src/main/scala/spark/util/Clock.scala
deleted file mode 100644
index aa71a5b..0000000
--- a/core/src/main/scala/spark/util/Clock.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-/**
- * An interface to represent clocks, so that they can be mocked out in unit tests.
- */
-private[spark] trait Clock {
- def getTime(): Long
-}
-
-private[spark] object SystemClock extends Clock {
- def getTime(): Long = System.currentTimeMillis()
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/CompletionIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/CompletionIterator.scala b/core/src/main/scala/spark/util/CompletionIterator.scala
deleted file mode 100644
index 2104508..0000000
--- a/core/src/main/scala/spark/util/CompletionIterator.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-/**
- * Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements
- */
-abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
- def next = sub.next
- def hasNext = {
- val r = sub.hasNext
- if (!r) {
- completion
- }
- r
- }
-
- def completion()
-}
-
-object CompletionIterator {
- def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = {
- new CompletionIterator[A,I](sub) {
- def completion() = completionFunction
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/Distribution.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/Distribution.scala b/core/src/main/scala/spark/util/Distribution.scala
deleted file mode 100644
index 5d4d7a6..0000000
--- a/core/src/main/scala/spark/util/Distribution.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.io.PrintStream
-
-/**
- * Util for getting some stats from a small sample of numeric values, with some handy summary functions.
- *
- * Entirely in memory, not intended as a good way to compute stats over large data sets.
- *
- * Assumes you are giving it a non-empty set of data
- */
-class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int) {
- require(startIdx < endIdx)
- def this(data: Traversable[Double]) = this(data.toArray, 0, data.size)
- java.util.Arrays.sort(data, startIdx, endIdx)
- val length = endIdx - startIdx
-
- val defaultProbabilities = Array(0,0.25,0.5,0.75,1.0)
-
- /**
- * Get the value of the distribution at the given probabilities. Probabilities should be
- * given from 0 to 1
- * @param probabilities
- */
- def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities) = {
- probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))}
- }
-
- private def closestIndex(p: Double) = {
- math.min((p * length).toInt + startIdx, endIdx - 1)
- }
-
- def showQuantiles(out: PrintStream = System.out) = {
- out.println("min\t25%\t50%\t75%\tmax")
- getQuantiles(defaultProbabilities).foreach{q => out.print(q + "\t")}
- out.println
- }
-
- def statCounter = StatCounter(data.slice(startIdx, endIdx))
-
- /**
- * print a summary of this distribution to the given PrintStream.
- * @param out
- */
- def summary(out: PrintStream = System.out) {
- out.println(statCounter)
- showQuantiles(out)
- }
-}
-
-object Distribution {
-
- def apply(data: Traversable[Double]): Option[Distribution] = {
- if (data.size > 0)
- Some(new Distribution(data))
- else
- None
- }
-
- def showQuantiles(out: PrintStream = System.out, quantiles: Traversable[Double]) {
- out.println("min\t25%\t50%\t75%\tmax")
- quantiles.foreach{q => out.print(q + "\t")}
- out.println
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/IdGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/IdGenerator.scala b/core/src/main/scala/spark/util/IdGenerator.scala
deleted file mode 100644
index 3422280..0000000
--- a/core/src/main/scala/spark/util/IdGenerator.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.util.concurrent.atomic.AtomicInteger
-
-/**
- * A util used to get a unique generation ID. This is a wrapper around Java's
- * AtomicInteger. An example usage is in BlockManager, where each BlockManager
- * instance would start an Akka actor and we use this utility to assign the Akka
- * actors unique names.
- */
-private[spark] class IdGenerator {
- private var id = new AtomicInteger
- def next: Int = id.incrementAndGet
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/IntParam.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/IntParam.scala b/core/src/main/scala/spark/util/IntParam.scala
deleted file mode 100644
index daf0d58..0000000
--- a/core/src/main/scala/spark/util/IntParam.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-/**
- * An extractor object for parsing strings into integers.
- */
-private[spark] object IntParam {
- def unapply(str: String): Option[Int] = {
- try {
- Some(str.toInt)
- } catch {
- case e: NumberFormatException => None
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/MemoryParam.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/MemoryParam.scala b/core/src/main/scala/spark/util/MemoryParam.scala
deleted file mode 100644
index 2985623..0000000
--- a/core/src/main/scala/spark/util/MemoryParam.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import spark.Utils
-
-/**
- * An extractor object for parsing JVM memory strings, such as "10g", into an Int representing
- * the number of megabytes. Supports the same formats as Utils.memoryStringToMb.
- */
-private[spark] object MemoryParam {
- def unapply(str: String): Option[Int] = {
- try {
- Some(Utils.memoryStringToMb(str))
- } catch {
- case e: NumberFormatException => None
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
deleted file mode 100644
index 92909e0..0000000
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
-import java.util.{TimerTask, Timer}
-import spark.Logging
-
-
-/**
- * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
- */
-class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
- private val delaySeconds = MetadataCleaner.getDelaySeconds
- private val periodSeconds = math.max(10, delaySeconds / 10)
- private val timer = new Timer(name + " cleanup timer", true)
-
- private val task = new TimerTask {
- override def run() {
- try {
- cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
- logInfo("Ran metadata cleaner for " + name)
- } catch {
- case e: Exception => logError("Error running cleanup task for " + name, e)
- }
- }
- }
-
- if (delaySeconds > 0) {
- logDebug(
- "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
- "and period of " + periodSeconds + " secs")
- timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
- }
-
- def cancel() {
- timer.cancel()
- }
-}
-
-
-object MetadataCleaner {
- def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
- def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString) }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/MutablePair.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/MutablePair.scala b/core/src/main/scala/spark/util/MutablePair.scala
deleted file mode 100644
index 78d404e..0000000
--- a/core/src/main/scala/spark/util/MutablePair.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-
-/**
- * A tuple of 2 elements. This can be used as an alternative to Scala's Tuple2 when we want to
- * minimize object allocation.
- *
- * @param _1 Element 1 of this MutablePair
- * @param _2 Element 2 of this MutablePair
- */
-case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
- @specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
- (var _1: T1, var _2: T2)
- extends Product2[T1, T2]
-{
- override def toString = "(" + _1 + "," + _2 + ")"
-
- override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[_,_]]
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/NextIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/NextIterator.scala b/core/src/main/scala/spark/util/NextIterator.scala
deleted file mode 100644
index 22163ec..0000000
--- a/core/src/main/scala/spark/util/NextIterator.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-/** Provides a basic/boilerplate Iterator implementation. */
-private[spark] abstract class NextIterator[U] extends Iterator[U] {
-
- private var gotNext = false
- private var nextValue: U = _
- private var closed = false
- protected var finished = false
-
- /**
- * Method for subclasses to implement to provide the next element.
- *
- * If no next element is available, the subclass should set `finished`
- * to `true` and may return any value (it will be ignored).
- *
- * This convention is required because `null` may be a valid value,
- * and using `Option` seems like it might create unnecessary Some/None
- * instances, given some iterators might be called in a tight loop.
- *
- * @return U, or set 'finished' when done
- */
- protected def getNext(): U
-
- /**
- * Method for subclasses to implement when all elements have been successfully
- * iterated, and the iteration is done.
- *
- * <b>Note:</b> `NextIterator` cannot guarantee that `close` will be
- * called because it has no control over what happens when an exception
- * happens in the user code that is calling hasNext/next.
- *
- * Ideally you should have another try/catch, as in HadoopRDD, that
- * ensures any resources are closed should iteration fail.
- */
- protected def close()
-
- /**
- * Calls the subclass-defined close method, but only once.
- *
- * Usually calling `close` multiple times should be fine, but historically
- * there have been issues with some InputFormats throwing exceptions.
- */
- def closeIfNeeded() {
- if (!closed) {
- close()
- closed = true
- }
- }
-
- override def hasNext: Boolean = {
- if (!finished) {
- if (!gotNext) {
- nextValue = getNext()
- if (finished) {
- closeIfNeeded()
- }
- gotNext = true
- }
- }
- !finished
- }
-
- override def next(): U = {
- if (!hasNext) {
- throw new NoSuchElementException("End of stream")
- }
- gotNext = false
- nextValue
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala b/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
deleted file mode 100644
index 00f782b..0000000
--- a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import scala.annotation.tailrec
-
-import java.io.OutputStream
-import java.util.concurrent.TimeUnit._
-
-class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
- val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
- val CHUNK_SIZE = 8192
- var lastSyncTime = System.nanoTime
- var bytesWrittenSinceSync: Long = 0
-
- override def write(b: Int) {
- waitToWrite(1)
- out.write(b)
- }
-
- override def write(bytes: Array[Byte]) {
- write(bytes, 0, bytes.length)
- }
-
- @tailrec
- override final def write(bytes: Array[Byte], offset: Int, length: Int) {
- val writeSize = math.min(length - offset, CHUNK_SIZE)
- if (writeSize > 0) {
- waitToWrite(writeSize)
- out.write(bytes, offset, writeSize)
- write(bytes, offset + writeSize, length)
- }
- }
-
- override def flush() {
- out.flush()
- }
-
- override def close() {
- out.close()
- }
-
- @tailrec
- private def waitToWrite(numBytes: Int) {
- val now = System.nanoTime
- val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS)
- val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
- if (rate < bytesPerSec) {
- // It's okay to write; just update some variables and return
- bytesWrittenSinceSync += numBytes
- if (now > lastSyncTime + SYNC_INTERVAL) {
- // Sync interval has passed; let's resync
- lastSyncTime = now
- bytesWrittenSinceSync = numBytes
- }
- } else {
- // Calculate how much time we should sleep to bring ourselves to the desired rate.
- // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
- val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
- if (sleepTime > 0) Thread.sleep(sleepTime)
- waitToWrite(numBytes)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/SerializableBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/SerializableBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala
deleted file mode 100644
index 7e68426..0000000
--- a/core/src/main/scala/spark/util/SerializableBuffer.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.nio.ByteBuffer
-import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream}
-import java.nio.channels.Channels
-
-/**
- * A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make
- * it easier to pass ByteBuffers in case class messages.
- */
-private[spark]
-class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
- def value = buffer
-
- private def readObject(in: ObjectInputStream) {
- val length = in.readInt()
- buffer = ByteBuffer.allocate(length)
- var amountRead = 0
- val channel = Channels.newChannel(in)
- while (amountRead < length) {
- val ret = channel.read(buffer)
- if (ret == -1) {
- throw new EOFException("End of file before fully reading buffer")
- }
- amountRead += ret
- }
- buffer.rewind() // Allow us to read it later
- }
-
- private def writeObject(out: ObjectOutputStream) {
- out.writeInt(buffer.limit())
- if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
- throw new IOException("Could not fully write buffer to output stream")
- }
- buffer.rewind() // Allow us to write it again later
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/StatCounter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala
deleted file mode 100644
index 76358d4..0000000
--- a/core/src/main/scala/spark/util/StatCounter.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-/**
- * A class for tracking the statistics of a set of numbers (count, mean and variance) in a
- * numerically robust way. Includes support for merging two StatCounters. Based on
- * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]].
- *
- * @constructor Initialize the StatCounter with the given values.
- */
-class StatCounter(values: TraversableOnce[Double]) extends Serializable {
- private var n: Long = 0 // Running count of our values
- private var mu: Double = 0 // Running mean of our values
- private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)
-
- merge(values)
-
- /** Initialize the StatCounter with no values. */
- def this() = this(Nil)
-
- /** Add a value into this StatCounter, updating the internal statistics. */
- def merge(value: Double): StatCounter = {
- val delta = value - mu
- n += 1
- mu += delta / n
- m2 += delta * (value - mu)
- this
- }
-
- /** Add multiple values into this StatCounter, updating the internal statistics. */
- def merge(values: TraversableOnce[Double]): StatCounter = {
- values.foreach(v => merge(v))
- this
- }
-
- /** Merge another StatCounter into this one, adding up the internal statistics. */
- def merge(other: StatCounter): StatCounter = {
- if (other == this) {
- merge(other.copy()) // Avoid overwriting fields in a weird order
- } else {
- if (n == 0) {
- mu = other.mu
- m2 = other.m2
- n = other.n
- } else if (other.n != 0) {
- val delta = other.mu - mu
- if (other.n * 10 < n) {
- mu = mu + (delta * other.n) / (n + other.n)
- } else if (n * 10 < other.n) {
- mu = other.mu - (delta * n) / (n + other.n)
- } else {
- mu = (mu * n + other.mu * other.n) / (n + other.n)
- }
- m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
- n += other.n
- }
- this
- }
- }
-
- /** Clone this StatCounter */
- def copy(): StatCounter = {
- val other = new StatCounter
- other.n = n
- other.mu = mu
- other.m2 = m2
- other
- }
-
- def count: Long = n
-
- def mean: Double = mu
-
- def sum: Double = n * mu
-
- /** Return the variance of the values. */
- def variance: Double = {
- if (n == 0)
- Double.NaN
- else
- m2 / n
- }
-
- /**
- * Return the sample variance, which corrects for bias in estimating the variance by dividing
- * by N-1 instead of N.
- */
- def sampleVariance: Double = {
- if (n <= 1)
- Double.NaN
- else
- m2 / (n - 1)
- }
-
- /** Return the standard deviation of the values. */
- def stdev: Double = math.sqrt(variance)
-
- /**
- * Return the sample standard deviation of the values, which corrects for bias in estimating the
- * variance by dividing by N-1 instead of N.
- */
- def sampleStdev: Double = math.sqrt(sampleVariance)
-
- override def toString: String = {
- "(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev)
- }
-}
-
-object StatCounter {
- /** Build a StatCounter from a list of values. */
- def apply(values: TraversableOnce[Double]) = new StatCounter(values)
-
- /** Build a StatCounter from a list of values passed as variable-length arguments. */
- def apply(values: Double*) = new StatCounter(values)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/TimeStampedHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
deleted file mode 100644
index 07772a0..0000000
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConversions
-import scala.collection.mutable.Map
-import scala.collection.immutable
-import spark.scheduler.MapStatus
-
-/**
- * This is a custom implementation of scala.collection.mutable.Map which stores the insertion
- * time stamp along with each key-value pair. Key-value pairs that are older than a particular
- * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in
- * replacement of scala.collection.mutable.HashMap.
- */
-class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
- val internalMap = new ConcurrentHashMap[A, (B, Long)]()
-
- def get(key: A): Option[B] = {
- val value = internalMap.get(key)
- if (value != null) Some(value._1) else None
- }
-
- def iterator: Iterator[(A, B)] = {
- val jIterator = internalMap.entrySet().iterator()
- JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue._1))
- }
-
- override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
- val newMap = new TimeStampedHashMap[A, B1]
- newMap.internalMap.putAll(this.internalMap)
- newMap.internalMap.put(kv._1, (kv._2, currentTime))
- newMap
- }
-
- override def - (key: A): Map[A, B] = {
- val newMap = new TimeStampedHashMap[A, B]
- newMap.internalMap.putAll(this.internalMap)
- newMap.internalMap.remove(key)
- newMap
- }
-
- override def += (kv: (A, B)): this.type = {
- internalMap.put(kv._1, (kv._2, currentTime))
- this
- }
-
- // Should we return previous value directly or as Option ?
- def putIfAbsent(key: A, value: B): Option[B] = {
- val prev = internalMap.putIfAbsent(key, (value, currentTime))
- if (prev != null) Some(prev._1) else None
- }
-
-
- override def -= (key: A): this.type = {
- internalMap.remove(key)
- this
- }
-
- override def update(key: A, value: B) {
- this += ((key, value))
- }
-
- override def apply(key: A): B = {
- val value = internalMap.get(key)
- if (value == null) throw new NoSuchElementException()
- value._1
- }
-
- override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
- JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
- }
-
- override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
-
- override def size: Int = internalMap.size
-
- override def foreach[U](f: ((A, B)) => U) {
- val iterator = internalMap.entrySet().iterator()
- while(iterator.hasNext) {
- val entry = iterator.next()
- val kv = (entry.getKey, entry.getValue._1)
- f(kv)
- }
- }
-
- def toMap: immutable.Map[A, B] = iterator.toMap
-
- /**
- * Removes old key-value pairs that have timestamp earlier than `threshTime`
- */
- def clearOldValues(threshTime: Long) {
- val iterator = internalMap.entrySet().iterator()
- while(iterator.hasNext) {
- val entry = iterator.next()
- if (entry.getValue._2 < threshTime) {
- logDebug("Removing key " + entry.getKey)
- iterator.remove()
- }
- }
- }
-
- private def currentTime: Long = System.currentTimeMillis()
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/TimeStampedHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/spark/util/TimeStampedHashSet.scala
deleted file mode 100644
index 41e3fd8..0000000
--- a/core/src/main/scala/spark/util/TimeStampedHashSet.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-import scala.collection.mutable.Set
-import scala.collection.JavaConversions
-import java.util.concurrent.ConcurrentHashMap
-
-
-class TimeStampedHashSet[A] extends Set[A] {
- val internalMap = new ConcurrentHashMap[A, Long]()
-
- def contains(key: A): Boolean = {
- internalMap.contains(key)
- }
-
- def iterator: Iterator[A] = {
- val jIterator = internalMap.entrySet().iterator()
- JavaConversions.asScalaIterator(jIterator).map(_.getKey)
- }
-
- override def + (elem: A): Set[A] = {
- val newSet = new TimeStampedHashSet[A]
- newSet ++= this
- newSet += elem
- newSet
- }
-
- override def - (elem: A): Set[A] = {
- val newSet = new TimeStampedHashSet[A]
- newSet ++= this
- newSet -= elem
- newSet
- }
-
- override def += (key: A): this.type = {
- internalMap.put(key, currentTime)
- this
- }
-
- override def -= (key: A): this.type = {
- internalMap.remove(key)
- this
- }
-
- override def empty: Set[A] = new TimeStampedHashSet[A]()
-
- override def size(): Int = internalMap.size()
-
- override def foreach[U](f: (A) => U): Unit = {
- val iterator = internalMap.entrySet().iterator()
- while(iterator.hasNext) {
- f(iterator.next.getKey)
- }
- }
-
- /**
- * Removes old values that have timestamp earlier than `threshTime`
- */
- def clearOldValues(threshTime: Long) {
- val iterator = internalMap.entrySet().iterator()
- while(iterator.hasNext) {
- val entry = iterator.next()
- if (entry.getValue < threshTime) {
- iterator.remove()
- }
- }
- }
-
- private def currentTime: Long = System.currentTimeMillis()
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/util/Vector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala
deleted file mode 100644
index a47cac3..0000000
--- a/core/src/main/scala/spark/util/Vector.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.util
-
-class Vector(val elements: Array[Double]) extends Serializable {
- def length = elements.length
-
- def apply(index: Int) = elements(index)
-
- def + (other: Vector): Vector = {
- if (length != other.length)
- throw new IllegalArgumentException("Vectors of different length")
- return Vector(length, i => this(i) + other(i))
- }
-
- def add(other: Vector) = this + other
-
- def - (other: Vector): Vector = {
- if (length != other.length)
- throw new IllegalArgumentException("Vectors of different length")
- return Vector(length, i => this(i) - other(i))
- }
-
- def subtract(other: Vector) = this - other
-
- def dot(other: Vector): Double = {
- if (length != other.length)
- throw new IllegalArgumentException("Vectors of different length")
- var ans = 0.0
- var i = 0
- while (i < length) {
- ans += this(i) * other(i)
- i += 1
- }
- return ans
- }
-
- /**
- * return (this + plus) dot other, but without creating any intermediate storage
- * @param plus
- * @param other
- * @return
- */
- def plusDot(plus: Vector, other: Vector): Double = {
- if (length != other.length)
- throw new IllegalArgumentException("Vectors of different length")
- if (length != plus.length)
- throw new IllegalArgumentException("Vectors of different length")
- var ans = 0.0
- var i = 0
- while (i < length) {
- ans += (this(i) + plus(i)) * other(i)
- i += 1
- }
- return ans
- }
-
- def += (other: Vector): Vector = {
- if (length != other.length)
- throw new IllegalArgumentException("Vectors of different length")
- var i = 0
- while (i < length) {
- elements(i) += other(i)
- i += 1
- }
- this
- }
-
- def addInPlace(other: Vector) = this +=other
-
- def * (scale: Double): Vector = Vector(length, i => this(i) * scale)
-
- def multiply (d: Double) = this * d
-
- def / (d: Double): Vector = this * (1 / d)
-
- def divide (d: Double) = this / d
-
- def unary_- = this * -1
-
- def sum = elements.reduceLeft(_ + _)
-
- def squaredDist(other: Vector): Double = {
- var ans = 0.0
- var i = 0
- while (i < length) {
- ans += (this(i) - other(i)) * (this(i) - other(i))
- i += 1
- }
- return ans
- }
-
- def dist(other: Vector): Double = math.sqrt(squaredDist(other))
-
- override def toString = elements.mkString("(", ", ", ")")
-}
-
-object Vector {
- def apply(elements: Array[Double]) = new Vector(elements)
-
- def apply(elements: Double*) = new Vector(elements.toArray)
-
- def apply(length: Int, initializer: Int => Double): Vector = {
- val elements: Array[Double] = Array.tabulate(length)(initializer)
- return new Vector(elements)
- }
-
- def zeros(length: Int) = new Vector(new Array[Double](length))
-
- def ones(length: Int) = Vector(length, _ => 1)
-
- class Multiplier(num: Double) {
- def * (vec: Vector) = vec * num
- }
-
- implicit def doubleToMultiplier(num: Double) = new Multiplier(num)
-
- implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] {
- def addInPlace(t1: Vector, t2: Vector) = t1 + t2
-
- def zero(initialValue: Vector) = Vector.zeros(initialValue.length)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/resources/test_metrics_config.properties
----------------------------------------------------------------------
diff --git a/core/src/test/resources/test_metrics_config.properties b/core/src/test/resources/test_metrics_config.properties
index 2b31ddf..056a158 100644
--- a/core/src/test/resources/test_metrics_config.properties
+++ b/core/src/test/resources/test_metrics_config.properties
@@ -1,6 +1,6 @@
*.sink.console.period = 10
*.sink.console.unit = seconds
-*.source.jvm.class = spark.metrics.source.JvmSource
+*.source.jvm.class = org.apache.spark.metrics.source.JvmSource
master.sink.console.period = 20
master.sink.console.unit = minutes
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/resources/test_metrics_system.properties
----------------------------------------------------------------------
diff --git a/core/src/test/resources/test_metrics_system.properties b/core/src/test/resources/test_metrics_system.properties
index d5479f0..6f5ecea 100644
--- a/core/src/test/resources/test_metrics_system.properties
+++ b/core/src/test/resources/test_metrics_system.properties
@@ -1,7 +1,7 @@
*.sink.console.period = 10
*.sink.console.unit = seconds
-test.sink.console.class = spark.metrics.sink.ConsoleSink
-test.sink.dummy.class = spark.metrics.sink.DummySink
-test.source.dummy.class = spark.metrics.source.DummySource
+test.sink.console.class = org.apache.spark.metrics.sink.ConsoleSink
+test.sink.dummy.class = org.apache.spark.metrics.sink.DummySink
+test.source.dummy.class = org.apache.spark.metrics.source.DummySource
test.sink.console.period = 20
test.sink.console.unit = minutes