You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:23 UTC

[39/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
new file mode 100644
index 0000000..86e8e75
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import java.text.SimpleDateFormat
+import java.util.Date
+import java.io.File
+
+import scala.collection.mutable.HashMap
+
+import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.util.duration._
+
+import org.apache.spark.{Logging, Utils}
+import org.apache.spark.deploy.ExecutorState
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.Master
+import org.apache.spark.deploy.worker.ui.WorkerWebUI
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.util.AkkaUtils
+
+
+private[spark] class Worker(
+    host: String,
+    port: Int,
+    webUiPort: Int,
+    cores: Int,
+    memory: Int,
+    masterUrl: String,
+    workDirPath: String = null)
+  extends Actor with Logging {
+
+  Utils.checkHost(host, "Expected hostname")
+  assert (port > 0)
+
+  val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
+
+  // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
+  val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+
+  var master: ActorRef = null
+  var masterWebUiUrl : String = ""
+  val workerId = generateWorkerId()
+  var sparkHome: File = null
+  var workDir: File = null
+  val executors = new HashMap[String, ExecutorRunner]
+  val finishedExecutors = new HashMap[String, ExecutorRunner]
+  val publicAddress = {
+    val envVar = System.getenv("SPARK_PUBLIC_DNS")
+    if (envVar != null) envVar else host
+  }
+  var webUi: WorkerWebUI = null
+
+  var coresUsed = 0
+  var memoryUsed = 0
+
+  val metricsSystem = MetricsSystem.createMetricsSystem("worker")
+  val workerSource = new WorkerSource(this)
+
+  def coresFree: Int = cores - coresUsed
+  def memoryFree: Int = memory - memoryUsed
+
+  def createWorkDir() {
+    workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
+    try {
+      // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
+      // So attempting to create and then check if directory was created or not.
+      workDir.mkdirs()
+      if ( !workDir.exists() || !workDir.isDirectory) {
+        logError("Failed to create work directory " + workDir)
+        System.exit(1)
+      }
+      assert (workDir.isDirectory)
+    } catch {
+      case e: Exception =>
+        logError("Failed to create work directory " + workDir, e)
+        System.exit(1)
+    }
+  }
+
+  override def preStart() {
+    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
+      host, port, cores, Utils.megabytesToString(memory)))
+    sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
+    logInfo("Spark home: " + sparkHome)
+    createWorkDir()
+    webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
+
+    webUi.start()
+    connectToMaster()
+
+    metricsSystem.registerSource(workerSource)
+    metricsSystem.start()
+  }
+
+  def connectToMaster() {
+    logInfo("Connecting to master " + masterUrl)
+    master = context.actorFor(Master.toAkkaUrl(masterUrl))
+    master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
+    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+    context.watch(master) // Doesn't work with remote actors, but useful for testing
+  }
+
+  override def receive = {
+    case RegisteredWorker(url) =>
+      masterWebUiUrl = url
+      logInfo("Successfully registered with master")
+      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
+        master ! Heartbeat(workerId)
+      }
+
+    case RegisterWorkerFailed(message) =>
+      logError("Worker registration failed: " + message)
+      System.exit(1)
+
+    case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
+      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+      val manager = new ExecutorRunner(
+        appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
+      executors(appId + "/" + execId) = manager
+      manager.start()
+      coresUsed += cores_
+      memoryUsed += memory_
+      master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
+
+    case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
+      master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+      val fullId = appId + "/" + execId
+      if (ExecutorState.isFinished(state)) {
+        val executor = executors(fullId)
+        logInfo("Executor " + fullId + " finished with state " + state +
+          message.map(" message " + _).getOrElse("") +
+          exitStatus.map(" exitStatus " + _).getOrElse(""))
+        finishedExecutors(fullId) = executor
+        executors -= fullId
+        coresUsed -= executor.cores
+        memoryUsed -= executor.memory
+      }
+
+    case KillExecutor(appId, execId) =>
+      val fullId = appId + "/" + execId
+      executors.get(fullId) match {
+        case Some(executor) =>
+          logInfo("Asked to kill executor " + fullId)
+          executor.kill()
+        case None =>
+          logInfo("Asked to kill unknown executor " + fullId)
+      }
+
+    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+      masterDisconnected()
+
+    case RequestWorkerState => {
+      sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
+        finishedExecutors.values.toList, masterUrl, cores, memory,
+        coresUsed, memoryUsed, masterWebUiUrl)
+    }
+  }
+
+  def masterDisconnected() {
+    // TODO: It would be nice to try to reconnect to the master, but just shut down for now.
+    // (Note that if reconnecting we would also need to assign IDs differently.)
+    logError("Connection to master failed! Shutting down.")
+    executors.values.foreach(_.kill())
+    System.exit(1)
+  }
+
+  def generateWorkerId(): String = {
+    "worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), host, port)
+  }
+
+  override def postStop() {
+    executors.values.foreach(_.kill())
+    webUi.stop()
+    metricsSystem.stop()
+  }
+}
+
+private[spark] object Worker {
+  def main(argStrings: Array[String]) {
+    val args = new WorkerArguments(argStrings)
+    val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
+      args.memory, args.master, args.workDir)
+    actorSystem.awaitTermination()
+  }
+
+  def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
+    masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
+    // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
+    val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+    val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
+      masterUrl, workDir)), name = "Worker")
+    (actorSystem, boundPort)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
new file mode 100644
index 0000000..6d91223
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import org.apache.spark.util.IntParam
+import org.apache.spark.util.MemoryParam
+import org.apache.spark.Utils
+import java.lang.management.ManagementFactory
+
+/**
+ * Command-line parser for the master.
+ */
+private[spark] class WorkerArguments(args: Array[String]) {
+  var host = Utils.localHostName()
+  var port = 0
+  var webUiPort = 8081
+  var cores = inferDefaultCores()
+  var memory = inferDefaultMemory()
+  var master: String = null
+  var workDir: String = null
+  
+  // Check for settings in environment variables 
+  if (System.getenv("SPARK_WORKER_PORT") != null) {
+    port = System.getenv("SPARK_WORKER_PORT").toInt
+  }
+  if (System.getenv("SPARK_WORKER_CORES") != null) {
+    cores = System.getenv("SPARK_WORKER_CORES").toInt
+  }
+  if (System.getenv("SPARK_WORKER_MEMORY") != null) {
+    memory = Utils.memoryStringToMb(System.getenv("SPARK_WORKER_MEMORY"))
+  }
+  if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
+    webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
+  }
+  if (System.getenv("SPARK_WORKER_DIR") != null) {
+    workDir = System.getenv("SPARK_WORKER_DIR")
+  }
+  
+  parse(args.toList)
+
+  def parse(args: List[String]): Unit = args match {
+    case ("--ip" | "-i") :: value :: tail =>
+      Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
+      host = value
+      parse(tail)
+
+    case ("--host" | "-h") :: value :: tail =>
+      Utils.checkHost(value, "Please use hostname " + value)
+      host = value
+      parse(tail)
+
+    case ("--port" | "-p") :: IntParam(value) :: tail =>
+      port = value
+      parse(tail)
+
+    case ("--cores" | "-c") :: IntParam(value) :: tail =>
+      cores = value
+      parse(tail)
+
+    case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
+      memory = value
+      parse(tail)
+
+    case ("--work-dir" | "-d") :: value :: tail =>
+      workDir = value
+      parse(tail)
+      
+    case "--webui-port" :: IntParam(value) :: tail =>
+      webUiPort = value
+      parse(tail)
+
+    case ("--help" | "-h") :: tail =>
+      printUsageAndExit(0)
+
+    case value :: tail =>
+      if (master != null) {  // Two positional arguments were given
+        printUsageAndExit(1)
+      }
+      master = value
+      parse(tail)
+
+    case Nil =>
+      if (master == null) {  // No positional argument was given
+        printUsageAndExit(1)
+      }
+
+    case _ =>
+      printUsageAndExit(1)
+  }
+
+  /**
+   * Print usage and exit JVM with the given exit code.
+   */
+  def printUsageAndExit(exitCode: Int) {
+    System.err.println(
+      "Usage: Worker [options] <master>\n" +
+      "\n" +
+      "Master must be a URL of the form spark://hostname:port\n" +
+      "\n" +
+      "Options:\n" +
+      "  -c CORES, --cores CORES  Number of cores to use\n" +
+      "  -m MEM, --memory MEM     Amount of memory to use (e.g. 1000M, 2G)\n" +
+      "  -d DIR, --work-dir DIR   Directory to run apps in (default: SPARK_HOME/work)\n" +
+      "  -i HOST, --ip IP         Hostname to listen on (deprecated, please use --host or -h)\n" +
+      "  -h HOST, --host HOST     Hostname to listen on\n" +
+      "  -p PORT, --port PORT     Port to listen on (default: random)\n" +
+      "  --webui-port PORT        Port for web UI (default: 8081)")
+    System.exit(exitCode)
+  }
+
+  def inferDefaultCores(): Int = {
+    Runtime.getRuntime.availableProcessors()
+  }
+
+  def inferDefaultMemory(): Int = {
+    val ibmVendor = System.getProperty("java.vendor").contains("IBM")
+    var totalMb = 0
+    try {
+      val bean = ManagementFactory.getOperatingSystemMXBean()
+      if (ibmVendor) {
+        val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
+        val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
+        totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
+      } else {
+        val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
+        val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
+        totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
+      }
+    } catch {
+      case e: Exception => {
+        totalMb = 2*1024
+        System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
+      }
+    }
+    // Leave out 1 GB for the operating system, but don't return a negative memory size
+    math.max(totalMb - 1024, 512)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
new file mode 100644
index 0000000..6427c01
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
@@ -0,0 +1,34 @@
+package org.apache.spark.deploy.worker
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class WorkerSource(val worker: Worker) extends Source {
+  val sourceName = "worker"
+  val metricRegistry = new MetricRegistry()
+
+  metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
+    override def getValue: Int = worker.executors.size
+  })
+
+  // Gauge for cores used of this worker
+  metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
+    override def getValue: Int = worker.coresUsed
+  })
+
+  // Gauge for memory used of this worker
+  metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
+    override def getValue: Int = worker.memoryUsed
+  })
+
+  // Gauge for cores free of this worker
+  metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
+    override def getValue: Int = worker.coresFree
+  })
+
+  // Gauge for memory free of this worker
+  metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
+    override def getValue: Int = worker.memoryFree
+  })
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
new file mode 100644
index 0000000..6192c23
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.duration._
+
+import net.liftweb.json.JsonAST.JValue
+
+import org.apache.spark.Utils
+import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
+import org.apache.spark.deploy.worker.ExecutorRunner
+import org.apache.spark.ui.UIUtils
+
+
+private[spark] class IndexPage(parent: WorkerWebUI) {
+  val workerActor = parent.worker.self
+  val worker = parent.worker
+  val timeout = parent.timeout
+
+  def renderJson(request: HttpServletRequest): JValue = {
+    val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
+    val workerState = Await.result(stateFuture, 30 seconds)
+    JsonProtocol.writeWorkerState(workerState)
+  }
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
+    val workerState = Await.result(stateFuture, 30 seconds)
+
+    val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
+    val runningExecutorTable =
+      UIUtils.listingTable(executorHeaders, executorRow, workerState.executors)
+    val finishedExecutorTable =
+      UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
+
+    val content =
+        <div class="row-fluid"> <!-- Worker Details -->
+          <div class="span12">
+            <ul class="unstyled">
+              <li><strong>ID:</strong> {workerState.workerId}</li>
+              <li><strong>
+                Master URL:</strong> {workerState.masterUrl}
+              </li>
+              <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
+              <li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)}
+                ({Utils.megabytesToString(workerState.memoryUsed)} Used)</li>
+            </ul>
+            <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
+          </div>
+        </div>
+
+        <div class="row-fluid"> <!-- Running Executors -->
+          <div class="span12">
+            <h4> Running Executors {workerState.executors.size} </h4>
+            {runningExecutorTable}
+          </div>
+        </div>
+
+        <div class="row-fluid"> <!-- Finished Executors  -->
+          <div class="span12">
+            <h4> Finished Executors </h4>
+            {finishedExecutorTable}
+          </div>
+        </div>;
+
+    UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
+      workerState.host, workerState.port))
+  }
+
+  def executorRow(executor: ExecutorRunner): Seq[Node] = {
+    <tr>
+      <td>{executor.execId}</td>
+      <td>{executor.cores}</td>
+      <td sorttable_customkey={executor.memory.toString}>
+        {Utils.megabytesToString(executor.memory)}
+      </td>
+      <td>
+        <ul class="unstyled">
+          <li><strong>ID:</strong> {executor.appId}</li>
+          <li><strong>Name:</strong> {executor.appDesc.name}</li>
+          <li><strong>User:</strong> {executor.appDesc.user}</li>
+        </ul>
+      </td>
+      <td>
+	 <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
+          .format(executor.appId, executor.execId)}>stdout</a>
+	 <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
+          .format(executor.appId, executor.execId)}>stderr</a>
+      </td> 
+    </tr>
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
new file mode 100644
index 0000000..bb8165a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker.ui
+
+import akka.util.{Duration, Timeout}
+
+import java.io.{FileInputStream, File}
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.{Handler, Server}
+
+import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.{Utils, Logging}
+import org.apache.spark.ui.JettyUtils
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.ui.UIUtils
+
+/**
+ * Web UI server for the standalone worker.
+ */
+private[spark]
+class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
+  extends Logging {
+  implicit val timeout = Timeout(
+    Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
+  val host = Utils.localHostName()
+  val port = requestedPort.getOrElse(
+    System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
+
+  var server: Option[Server] = None
+  var boundPort: Option[Int] = None
+
+  val indexPage = new IndexPage(this)
+
+  val metricsHandlers = worker.metricsSystem.getServletHandlers
+
+  val handlers = metricsHandlers ++ Array[(String, Handler)](
+    ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
+    ("/log", (request: HttpServletRequest) => log(request)),
+    ("/logPage", (request: HttpServletRequest) => logPage(request)),
+    ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
+    ("*", (request: HttpServletRequest) => indexPage.render(request))
+  )
+
+  def start() {
+    try {
+      val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+      server = Some(srv)
+      boundPort = Some(bPort)
+      logInfo("Started Worker web UI at http://%s:%d".format(host, bPort))
+    } catch {
+      case e: Exception =>
+        logError("Failed to create Worker JettyUtils", e)
+        System.exit(1)
+    }
+  }
+
+  def log(request: HttpServletRequest): String = {
+    val defaultBytes = 100 * 1024
+    val appId = request.getParameter("appId")
+    val executorId = request.getParameter("executorId")
+    val logType = request.getParameter("logType")
+    val offset = Option(request.getParameter("offset")).map(_.toLong)
+    val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+    val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
+
+    val (startByte, endByte) = getByteRange(path, offset, byteLength)
+    val file = new File(path)
+    val logLength = file.length
+
+    val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n"
+      .format(startByte, endByte, logLength, appId, executorId, logType)
+    pre + Utils.offsetBytes(path, startByte, endByte)
+  }
+
+  def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
+    val defaultBytes = 100 * 1024
+    val appId = request.getParameter("appId")
+    val executorId = request.getParameter("executorId")
+    val logType = request.getParameter("logType")
+    val offset = Option(request.getParameter("offset")).map(_.toLong)
+    val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+    val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
+
+    val (startByte, endByte) = getByteRange(path, offset, byteLength)
+    val file = new File(path)
+    val logLength = file.length
+
+    val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
+
+    val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p>
+
+    val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
+
+    val backButton =
+      if (startByte > 0) {
+        <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s"
+          .format(appId, executorId, logType, math.max(startByte-byteLength, 0),
+          byteLength)}>
+          <button type="button" class="btn btn-default">
+            Previous {Utils.bytesToString(math.min(byteLength, startByte))}
+          </button>
+        </a>
+      }
+      else {
+        <button type="button" class="btn btn-default" disabled="disabled">
+          Previous 0 B
+        </button>
+      }
+
+    val nextButton =
+      if (endByte < logLength) {
+        <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
+          format(appId, executorId, logType, endByte, byteLength)}>
+          <button type="button" class="btn btn-default">
+            Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))}
+          </button>
+        </a>
+      }
+      else {
+        <button type="button" class="btn btn-default" disabled="disabled">
+          Next 0 B
+        </button>
+      }
+
+    val content =
+      <html>
+        <body>
+          {linkToMaster}
+          <div>
+            <div style="float:left;width:40%">{backButton}</div>
+            <div style="float:left;">{range}</div>
+            <div style="float:right;">{nextButton}</div>
+          </div>
+          <br />
+          <div style="height:500px;overflow:auto;padding:5px;">
+            <pre>{logText}</pre>
+          </div>
+        </body>
+      </html>
+    UIUtils.basicSparkPage(content, logType + " log page for " + appId)
+  }
+
+  /** Determine the byte range for a log or log page. */
+  def getByteRange(path: String, offset: Option[Long], byteLength: Int)
+  : (Long, Long) = {
+    val defaultBytes = 100 * 1024
+    val maxBytes = 1024 * 1024
+
+    val file = new File(path)
+    val logLength = file.length()
+    val getOffset = offset.getOrElse(logLength-defaultBytes)
+
+    val startByte =
+      if (getOffset < 0) 0L
+      else if (getOffset > logLength) logLength
+      else getOffset
+
+    val logPageLength = math.min(byteLength, maxBytes)
+
+    val endByte = math.min(startByte+logPageLength, logLength)
+
+    (startByte, endByte)
+  }
+
+  def stop() {
+    server.foreach(_.stop())
+  }
+}
+
+private[spark] object WorkerWebUI {
+  val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
+  val DEFAULT_PORT="8081"
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
new file mode 100644
index 0000000..5446a3f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io.{File}
+import java.lang.management.ManagementFactory
+import java.nio.ByteBuffer
+import java.util.concurrent._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.scheduler._
+import org.apache.spark._
+
+
+/**
+ * The Mesos executor for Spark.
+ */
+private[spark] class Executor(
+    executorId: String,
+    slaveHostname: String,
+    properties: Seq[(String, String)])
+  extends Logging
+{
+  // Application dependencies (added through SparkContext) that we've fetched so far on this node.
+  // Each map holds the master's timestamp for the version of that file or JAR we got.
+  private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
+  private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
+
+  private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
+
+  initLogging()
+
+  // No ip or host:port - just hostname
+  Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
+  // must not have port specified.
+  assert (0 == Utils.parseHostPort(slaveHostname)._2)
+
+  // Make sure the local hostname we report matches the cluster scheduler's name for this host
+  Utils.setCustomHostname(slaveHostname)
+
+  // Set spark.* system properties from executor arg
+  for ((key, value) <- properties) {
+    System.setProperty(key, value)
+  }
+
+  // If we are in yarn mode, systems can have different disk layouts so we must set it
+  // to what Yarn on this system said was available. This will be used later when SparkEnv
+  // created.
+  if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
+    System.setProperty("spark.local.dir", getYarnLocalDirs())
+  }
+
+  // Create our ClassLoader and set it on this thread
+  private val urlClassLoader = createClassLoader()
+  private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
+  Thread.currentThread.setContextClassLoader(replClassLoader)
+
+  // Make any thread terminations due to uncaught exceptions kill the entire
+  // executor process to avoid surprising stalls.
+  Thread.setDefaultUncaughtExceptionHandler(
+    new Thread.UncaughtExceptionHandler {
+      override def uncaughtException(thread: Thread, exception: Throwable) {
+        try {
+          logError("Uncaught exception in thread " + thread, exception)
+
+          // We may have been called from a shutdown hook. If so, we must not call System.exit().
+          // (If we do, we will deadlock.)
+          if (!Utils.inShutdown()) {
+            if (exception.isInstanceOf[OutOfMemoryError]) {
+              System.exit(ExecutorExitCode.OOM)
+            } else {
+              System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+            }
+          }
+        } catch {
+          case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+          case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
+        }
+      }
+    }
+  )
+
+  val executorSource = new ExecutorSource(this)
+
+  // Initialize Spark environment (using system properties read above)
+  val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
+  SparkEnv.set(env)
+  env.metricsSystem.registerSource(executorSource)
+
+  private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
+
+  // Start worker thread pool
+  val threadPool = new ThreadPoolExecutor(
+    1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
+
+  def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
+    threadPool.execute(new TaskRunner(context, taskId, serializedTask))
+  }
+
+  /** Get the Yarn approved local directories. */
+  private def getYarnLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+      .getOrElse(""))
+
+    if (localDirs.isEmpty()) {
+      throw new Exception("Yarn Local dirs can't be empty")
+    }
+    return localDirs
+  }
+
+  class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
+    extends Runnable {
+
+    override def run() {
+      val startTime = System.currentTimeMillis()
+      SparkEnv.set(env)
+      Thread.currentThread.setContextClassLoader(replClassLoader)
+      val ser = SparkEnv.get.closureSerializer.newInstance()
+      logInfo("Running task ID " + taskId)
+      context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
+      var attemptedTask: Option[Task[Any]] = None
+      var taskStart: Long = 0
+      def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
+      val startGCTime = getTotalGCTime
+
+      try {
+        SparkEnv.set(env)
+        Accumulators.clear()
+        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
+        updateDependencies(taskFiles, taskJars)
+        val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
+        attemptedTask = Some(task)
+        logInfo("Its epoch is " + task.epoch)
+        env.mapOutputTracker.updateEpoch(task.epoch)
+        taskStart = System.currentTimeMillis()
+        val value = task.run(taskId.toInt)
+        val taskFinish = System.currentTimeMillis()
+        for (m <- task.metrics) {
+          m.hostname = Utils.localHostName
+          m.executorDeserializeTime = (taskStart - startTime).toInt
+          m.executorRunTime = (taskFinish - taskStart).toInt
+          m.jvmGCTime = getTotalGCTime - startGCTime
+        }
+        //TODO I'd also like to track the time it takes to serialize the task results, but that is huge headache, b/c
+        // we need to serialize the task metrics first.  If TaskMetrics had a custom serialized format, we could
+        // just change the relevants bytes in the byte buffer
+        val accumUpdates = Accumulators.values
+        val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null))
+        val serializedResult = ser.serialize(result)
+        logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
+        if (serializedResult.limit >= (akkaFrameSize - 1024)) {
+          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure()))
+          return
+        }
+        context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
+        logInfo("Finished task ID " + taskId)
+      } catch {
+        case ffe: FetchFailedException => {
+          val reason = ffe.toTaskEndReason
+          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
+        }
+
+        case t: Throwable => {
+          val serviceTime = (System.currentTimeMillis() - taskStart).toInt
+          val metrics = attemptedTask.flatMap(t => t.metrics)
+          for (m <- metrics) {
+            m.executorRunTime = serviceTime
+            m.jvmGCTime = getTotalGCTime - startGCTime
+          }
+          val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
+          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
+
+          // TODO: Should we exit the whole executor here? On the one hand, the failed task may
+          // have left some weird state around depending on when the exception was thrown, but on
+          // the other hand, maybe we could detect that when future tasks fail and exit then.
+          logError("Exception in task ID " + taskId, t)
+          //System.exit(1)
+        }
+      }
+    }
+  }
+
+  /**
+   * Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes
+   * created by the interpreter to the search path
+   */
+  private def createClassLoader(): ExecutorURLClassLoader = {
+    var loader = this.getClass.getClassLoader
+
+    // For each of the jars in the jarSet, add them to the class loader.
+    // We assume each of the files has already been fetched.
+    val urls = currentJars.keySet.map { uri =>
+      new File(uri.split("/").last).toURI.toURL
+    }.toArray
+    new ExecutorURLClassLoader(urls, loader)
+  }
+
+  /**
+   * If the REPL is in use, add another ClassLoader that will read
+   * new classes defined by the REPL as the user types code
+   */
+  private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
+    val classUri = System.getProperty("spark.repl.class.uri")
+    if (classUri != null) {
+      logInfo("Using REPL class URI: " + classUri)
+      try {
+        val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
+          .asInstanceOf[Class[_ <: ClassLoader]]
+        val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader])
+        return constructor.newInstance(classUri, parent)
+      } catch {
+        case _: ClassNotFoundException =>
+          logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
+          System.exit(1)
+          null
+      }
+    } else {
+      return parent
+    }
+  }
+
+  /**
+   * Download any missing dependencies if we receive a new set of files and JARs from the
+   * SparkContext. Also adds any new JARs we fetched to the class loader.
+   */
+  private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
+    synchronized {
+      // Fetch missing dependencies
+      for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
+        logInfo("Fetching " + name + " with timestamp " + timestamp)
+        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
+        currentFiles(name) = timestamp
+      }
+      for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
+        logInfo("Fetching " + name + " with timestamp " + timestamp)
+        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
+        currentJars(name) = timestamp
+        // Add it to our class loader
+        val localName = name.split("/").last
+        val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
+        if (!urlClassLoader.getURLs.contains(url)) {
+          logInfo("Adding " + url + " to class loader")
+          urlClassLoader.addURL(url)
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
new file mode 100644
index 0000000..ad7dd34
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.nio.ByteBuffer
+import org.apache.spark.TaskState.TaskState
+
+/**
+ * A pluggable interface used by the Executor to send updates to the cluster scheduler.
+ */
+private[spark] trait ExecutorBackend {
+  def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
new file mode 100644
index 0000000..e5c9bbb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+/**
+ * These are exit codes that executors should use to provide the master with information about
+ * executor failures assuming that cluster management framework can capture the exit codes (but
+ * perhaps not log files). The exit code constants here are chosen to be unlikely to conflict
+ * with "natural" exit statuses that may be caused by the JVM or user code. In particular,
+ * exit codes 128+ arise on some Unix-likes as a result of signals, and it appears that the
+ * OpenJDK JVM may use exit code 1 in some of its own "last chance" code.
+ */
+private[spark]
+object ExecutorExitCode {
+  /** The default uncaught exception handler was reached. */
+  val UNCAUGHT_EXCEPTION = 50
+
+  /** The default uncaught exception handler was called and an exception was encountered while
+      logging the exception. */
+  val UNCAUGHT_EXCEPTION_TWICE = 51
+
+  /** The default uncaught exception handler was reached, and the uncaught exception was an 
+      OutOfMemoryError. */
+  val OOM = 52
+
+  /** DiskStore failed to create a local temporary directory after many attempts. */
+  val DISK_STORE_FAILED_TO_CREATE_DIR = 53
+
+  def explainExitCode(exitCode: Int): String = {
+    exitCode match {
+      case UNCAUGHT_EXCEPTION => "Uncaught exception"
+      case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed"
+      case OOM => "OutOfMemoryError"
+      case DISK_STORE_FAILED_TO_CREATE_DIR =>
+        "Failed to create local directory (bad spark.local.dir?)"
+      case _ => 
+        "Unknown executor exit code (" + exitCode + ")" + (
+          if (exitCode > 128)
+            " (died from signal " + (exitCode - 128) + "?)"
+          else
+            ""
+        )
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
new file mode 100644
index 0000000..17653cd
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -0,0 +1,55 @@
+package org.apache.spark.executor
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.fs.LocalFileSystem
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.metrics.source.Source
+
+class ExecutorSource(val executor: Executor) extends Source {
+  private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
+    FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
+
+  private def registerFileSystemStat[T](
+        scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
+    metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
+      override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
+    })
+  }
+
+  val metricRegistry = new MetricRegistry()
+  val sourceName = "executor"
+
+  // Gauge for executor thread pool's actively executing task counts
+  metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
+    override def getValue: Int = executor.threadPool.getActiveCount()
+  })
+
+  // Gauge for executor thread pool's approximate total number of tasks that have been completed
+  metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
+    override def getValue: Long = executor.threadPool.getCompletedTaskCount()
+  })
+
+  // Gauge for executor thread pool's current number of threads
+  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
+    override def getValue: Int = executor.threadPool.getPoolSize()
+  })
+
+  // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
+  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
+    override def getValue: Int = executor.threadPool.getMaximumPoolSize()
+  })
+
+  // Gauge for file system stats of this executor
+  for (scheme <- Array("hdfs", "file")) {
+    registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
+    registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
+    registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
+    registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
+    registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
new file mode 100644
index 0000000..f9bfe8e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.net.{URLClassLoader, URL}
+
+/**
+ * The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
+ */
+private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
+  extends URLClassLoader(urls, parent) {
+
+  override def addURL(url: URL) {
+    super.addURL(url)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
new file mode 100644
index 0000000..410a94d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.nio.ByteBuffer
+import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver}
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, TaskStatus => MesosTaskStatus, _}
+import org.apache.spark.TaskState.TaskState
+import com.google.protobuf.ByteString
+import org.apache.spark.{Utils, Logging}
+import org.apache.spark.TaskState
+
+private[spark] class MesosExecutorBackend
+  extends MesosExecutor
+  with ExecutorBackend
+  with Logging {
+
+  var executor: Executor = null
+  var driver: ExecutorDriver = null
+
+  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
+    val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
+    driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
+      .setTaskId(mesosTaskId)
+      .setState(TaskState.toMesos(state))
+      .setData(ByteString.copyFrom(data))
+      .build())
+  }
+
+  override def registered(
+      driver: ExecutorDriver,
+      executorInfo: ExecutorInfo,
+      frameworkInfo: FrameworkInfo,
+      slaveInfo: SlaveInfo) {
+    logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
+    this.driver = driver
+    val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
+    executor = new Executor(
+      executorInfo.getExecutorId.getValue,
+      slaveInfo.getHostname,
+      properties)
+  }
+
+  override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
+    val taskId = taskInfo.getTaskId.getValue.toLong
+    if (executor == null) {
+      logError("Received launchTask but executor was null")
+    } else {
+      executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+    }
+  }
+
+  override def error(d: ExecutorDriver, message: String) {
+    logError("Error from Mesos: " + message)
+  }
+
+  override def killTask(d: ExecutorDriver, t: TaskID) {
+    logWarning("Mesos asked us to kill task " + t.getValue + "; ignoring (not yet implemented)")
+  }
+
+  override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {}
+
+  override def disconnected(d: ExecutorDriver) {}
+
+  override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
+
+  override def shutdown(d: ExecutorDriver) {}
+}
+
+/**
+ * Entry point for Mesos executor.
+ */
+private[spark] object MesosExecutorBackend {
+  def main(args: Array[String]) {
+    MesosNativeLibrary.load()
+    // Create a new Executor and start it running
+    val runner = new MesosExecutorBackend()
+    new MesosExecutorDriver(runner).run()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
new file mode 100644
index 0000000..65801f7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.nio.ByteBuffer
+
+import akka.actor.{ActorRef, Actor, Props, Terminated}
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+
+import org.apache.spark.{Logging, Utils, SparkEnv}
+import org.apache.spark.TaskState.TaskState
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
+import org.apache.spark.util.AkkaUtils
+
+
+private[spark] class StandaloneExecutorBackend(
+    driverUrl: String,
+    executorId: String,
+    hostPort: String,
+    cores: Int)
+  extends Actor
+  with ExecutorBackend
+  with Logging {
+
+  Utils.checkHostPort(hostPort, "Expected hostport")
+
+  var executor: Executor = null
+  var driver: ActorRef = null
+
+  override def preStart() {
+    logInfo("Connecting to driver: " + driverUrl)
+    driver = context.actorFor(driverUrl)
+    driver ! RegisterExecutor(executorId, hostPort, cores)
+    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+    context.watch(driver) // Doesn't work with remote actors, but useful for testing
+  }
+
+  override def receive = {
+    case RegisteredExecutor(sparkProperties) =>
+      logInfo("Successfully registered with driver")
+      // Make this host instead of hostPort ?
+      executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
+
+    case RegisterExecutorFailed(message) =>
+      logError("Slave registration failed: " + message)
+      System.exit(1)
+
+    case LaunchTask(taskDesc) =>
+      logInfo("Got assigned task " + taskDesc.taskId)
+      if (executor == null) {
+        logError("Received launchTask but executor was null")
+        System.exit(1)
+      } else {
+        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+      }
+
+    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+      logError("Driver terminated or disconnected! Shutting down.")
+      System.exit(1)
+  }
+
+  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
+    driver ! StatusUpdate(executorId, taskId, state, data)
+  }
+}
+
+private[spark] object StandaloneExecutorBackend {
+  def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
+    // Debug code
+    Utils.checkHost(hostname)
+
+    // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
+    // before getting started with all our system properties, etc
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+    // set it
+    val sparkHostPort = hostname + ":" + boundPort
+    System.setProperty("spark.hostPort", sparkHostPort)
+    val actor = actorSystem.actorOf(
+      Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
+      name = "Executor")
+    actorSystem.awaitTermination()
+  }
+
+  def main(args: Array[String]) {
+    if (args.length < 4) {
+      //the reason we allow the last frameworkId argument is to make it easy to kill rogue executors
+      System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores> [<appid>]")
+      System.exit(1)
+    }
+    run(args(0), args(1), args(2), args(3).toInt)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
new file mode 100644
index 0000000..f311141
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+class TaskMetrics extends Serializable {
+  /**
+   * Host's name the task runs on 
+   */
+  var hostname: String = _
+
+  /**
+   * Time taken on the executor to deserialize this task
+   */
+  var executorDeserializeTime: Int = _
+
+  /**
+   * Time the executor spends actually running the task (including fetching shuffle data)
+   */
+  var executorRunTime: Int = _
+
+  /**
+   * The number of bytes this task transmitted back to the driver as the TaskResult
+   */
+  var resultSize: Long = _
+
+  /**
+   * Amount of time the JVM spent in garbage collection while executing this task
+   */
+  var jvmGCTime: Long = _
+
+  /**
+   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
+   */
+  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+
+  /**
+   * If this task writes to shuffle output, metrics on the written shuffle data will be collected here
+   */
+  var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
+}
+
+object TaskMetrics {
+  private[spark] def empty(): TaskMetrics = new TaskMetrics
+}
+
+
+class ShuffleReadMetrics extends Serializable {
+  /**
+   * Time when shuffle finishs
+   */
+  var shuffleFinishTime: Long = _
+
+  /**
+   * Total number of blocks fetched in a shuffle (remote or local)
+   */
+  var totalBlocksFetched: Int = _
+
+  /**
+   * Number of remote blocks fetched in a shuffle
+   */
+  var remoteBlocksFetched: Int = _
+
+  /**
+   * Local blocks fetched in a shuffle
+   */
+  var localBlocksFetched: Int = _
+
+  /**
+   * Total time that is spent blocked waiting for shuffle to fetch data
+   */
+  var fetchWaitTime: Long = _
+
+  /**
+   * The total amount of time for all the shuffle fetches.  This adds up time from overlapping
+   *     shuffles, so can be longer than task time
+   */
+  var remoteFetchTime: Long = _
+
+  /**
+   * Total number of remote bytes read from a shuffle
+   */
+  var remoteBytesRead: Long = _
+}
+
+class ShuffleWriteMetrics extends Serializable {
+  /**
+   * Number of bytes written for a shuffle
+   */
+  var shuffleBytesWritten: Long = _
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
new file mode 100644
index 0000000..90a0420
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.io
+
+import java.io.{InputStream, OutputStream}
+
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+
+import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+
+
+/**
+ * CompressionCodec allows the customization of choosing different compression implementations
+ * to be used in block storage.
+ */
+trait CompressionCodec {
+
+  def compressedOutputStream(s: OutputStream): OutputStream
+
+  def compressedInputStream(s: InputStream): InputStream
+}
+
+
+private[spark] object CompressionCodec {
+
+  def createCodec(): CompressionCodec = {
+    // Set the default codec to Snappy since the LZF implementation initializes a pretty large
+    // buffer for every stream, which results in a lot of memory overhead when the number of
+    // shuffle reduce buckets are large.
+    createCodec(classOf[SnappyCompressionCodec].getName)
+  }
+
+  def createCodec(codecName: String): CompressionCodec = {
+    Class.forName(
+      System.getProperty("spark.io.compression.codec", codecName),
+      true,
+      Thread.currentThread.getContextClassLoader).newInstance().asInstanceOf[CompressionCodec]
+  }
+}
+
+
+/**
+ * LZF implementation of [[org.apache.spark.io.CompressionCodec]].
+ */
+class LZFCompressionCodec extends CompressionCodec {
+
+  override def compressedOutputStream(s: OutputStream): OutputStream = {
+    new LZFOutputStream(s).setFinishBlockOnFlush(true)
+  }
+
+  override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
+}
+
+
+/**
+ * Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
+ * Block size can be configured by spark.io.compression.snappy.block.size.
+ */
+class SnappyCompressionCodec extends CompressionCodec {
+
+  override def compressedOutputStream(s: OutputStream): OutputStream = {
+    val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
+    new SnappyOutputStream(s, blockSize)
+  }
+
+  override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
new file mode 100644
index 0000000..0f9c4e0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics
+
+import java.util.Properties
+import java.io.{File, FileInputStream, InputStream, IOException}
+
+import scala.collection.mutable
+import scala.util.matching.Regex
+
+import org.apache.spark.Logging
+
+private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
+  initLogging()
+
+  val DEFAULT_PREFIX = "*"
+  val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
+  val METRICS_CONF = "metrics.properties"
+
+  val properties = new Properties()
+  var propertyCategories: mutable.HashMap[String, Properties] = null
+
+  private def setDefaultProperties(prop: Properties) {
+    prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
+    prop.setProperty("*.sink.servlet.uri", "/metrics/json")
+    prop.setProperty("*.sink.servlet.sample", "false")
+    prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
+    prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
+  }
+
+  def initialize() {
+    //Add default properties in case there's no properties file
+    setDefaultProperties(properties)
+
+    // If spark.metrics.conf is not set, try to get file in class path
+    var is: InputStream = null
+    try {
+      is = configFile match {
+        case Some(f) => new FileInputStream(f)
+        case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF)
+      }
+
+      if (is != null) {
+        properties.load(is)
+      }
+    } catch {
+      case e: Exception => logError("Error loading configure file", e)
+    } finally {
+      if (is != null) is.close()
+    }
+
+    propertyCategories = subProperties(properties, INSTANCE_REGEX)
+    if (propertyCategories.contains(DEFAULT_PREFIX)) {
+      import scala.collection.JavaConversions._
+
+      val defaultProperty = propertyCategories(DEFAULT_PREFIX)
+      for { (inst, prop) <- propertyCategories
+            if (inst != DEFAULT_PREFIX)
+            (k, v) <- defaultProperty
+            if (prop.getProperty(k) == null) } {
+        prop.setProperty(k, v)
+      }
+    }
+  }
+
+  def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
+    val subProperties = new mutable.HashMap[String, Properties]
+    import scala.collection.JavaConversions._
+    prop.foreach { kv =>
+      if (regex.findPrefixOf(kv._1) != None) {
+        val regex(prefix, suffix) = kv._1
+        subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
+      }
+    }
+    subProperties
+  }
+
+  def getInstance(inst: String): Properties = {
+    propertyCategories.get(inst) match {
+      case Some(s) => s
+      case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
new file mode 100644
index 0000000..bec0c83
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics
+
+import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
+import org.apache.spark.metrics.source.Source
+
+/**
+ * Spark Metrics System, created by specific "instance", combined by source,
+ * sink, periodically poll source metrics data to sink destinations.
+ *
+ * "instance" specify "who" (the role) use metrics system. In spark there are several roles
+ * like master, worker, executor, client driver, these roles will create metrics system
+ * for monitoring. So instance represents these roles. Currently in Spark, several instances
+ * have already implemented: master, worker, executor, driver, applications.
+ *
+ * "source" specify "where" (source) to collect metrics data. In metrics system, there exists
+ * two kinds of source:
+ *   1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect
+ *   Spark component's internal state, these sources are related to instance and will be
+ *   added after specific metrics system is created.
+ *   2. Common source, like JvmSource, which will collect low level state, is configured by
+ *   configuration and loaded through reflection.
+ *
+ * "sink" specify "where" (destination) to output metrics data to. Several sinks can be
+ * coexisted and flush metrics to all these sinks.
+ *
+ * Metrics configuration format is like below:
+ * [instance].[sink|source].[name].[options] = xxxx
+ *
+ * [instance] can be "master", "worker", "executor", "driver", "applications" which means only
+ * the specified instance has this property.
+ * wild card "*" can be used to replace instance name, which means all the instances will have
+ * this property.
+ *
+ * [sink|source] means this property belongs to source or sink. This field can only be source or sink.
+ *
+ * [name] specify the name of sink or source, it is custom defined.
+ *
+ * [options] is the specific property of this source or sink.
+ */
+private[spark] class MetricsSystem private (val instance: String) extends Logging {
+  initLogging()
+
+  val confFile = System.getProperty("spark.metrics.conf")
+  val metricsConfig = new MetricsConfig(Option(confFile))
+
+  val sinks = new mutable.ArrayBuffer[Sink]
+  val sources = new mutable.ArrayBuffer[Source]
+  val registry = new MetricRegistry()
+
+  // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
+  private var metricsServlet: Option[MetricsServlet] = None
+
+  /** Get any UI handlers used by this metrics system. */
+  def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
+
+  metricsConfig.initialize()
+  registerSources()
+  registerSinks()
+
+  def start() {
+    sinks.foreach(_.start)
+  }
+
+  def stop() {
+    sinks.foreach(_.stop)
+  }
+
+  def registerSource(source: Source) {
+    sources += source
+    try {
+      registry.register(source.sourceName, source.metricRegistry)
+    } catch {
+      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
+    }
+  }
+
+  def removeSource(source: Source) {
+    sources -= source
+    registry.removeMatching(new MetricFilter {
+      def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
+    })
+  }
+
+  def registerSources() {
+    val instConfig = metricsConfig.getInstance(instance)
+    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
+
+    // Register all the sources related to instance
+    sourceConfigs.foreach { kv =>
+      val classPath = kv._2.getProperty("class")
+      try {
+        val source = Class.forName(classPath).newInstance()
+        registerSource(source.asInstanceOf[Source])
+      } catch {
+        case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
+      }
+    }
+  }
+
+  def registerSinks() {
+    val instConfig = metricsConfig.getInstance(instance)
+    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
+
+    sinkConfigs.foreach { kv =>
+      val classPath = kv._2.getProperty("class")
+      try {
+        val sink = Class.forName(classPath)
+          .getConstructor(classOf[Properties], classOf[MetricRegistry])
+          .newInstance(kv._2, registry)
+        if (kv._1 == "servlet") {
+           metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+        } else {
+          sinks += sink.asInstanceOf[Sink]
+        }
+      } catch {
+        case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
+      }
+    }
+  }
+}
+
+private[spark] object MetricsSystem {
+  val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
+  val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
+
+  val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
+  val MINIMAL_POLL_PERIOD = 1
+
+  def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
+    val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
+    if (period < MINIMAL_POLL_PERIOD) {
+      throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit +
+        " below than minimal polling period ")
+    }
+  }
+
+  def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
new file mode 100644
index 0000000..bce257d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.metrics.MetricsSystem
+
+class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+  val CONSOLE_DEFAULT_PERIOD = 10
+  val CONSOLE_DEFAULT_UNIT = "SECONDS"
+
+  val CONSOLE_KEY_PERIOD = "period"
+  val CONSOLE_KEY_UNIT = "unit"
+
+  val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
+    case Some(s) => s.toInt
+    case None => CONSOLE_DEFAULT_PERIOD
+  }
+
+  val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
+    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+    case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
+  }
+
+  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+  val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
+      .convertDurationsTo(TimeUnit.MILLISECONDS)
+      .convertRatesTo(TimeUnit.SECONDS)
+      .build()
+
+  override def start() {
+    reporter.start(pollPeriod, pollUnit)
+  }
+
+  override def stop() {
+    reporter.stop()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
new file mode 100644
index 0000000..3d1a06a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import com.codahale.metrics.{CsvReporter, MetricRegistry}
+
+import java.io.File
+import java.util.{Locale, Properties}
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.metrics.MetricsSystem
+
+class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+  val CSV_KEY_PERIOD = "period"
+  val CSV_KEY_UNIT = "unit"
+  val CSV_KEY_DIR = "directory"
+
+  val CSV_DEFAULT_PERIOD = 10
+  val CSV_DEFAULT_UNIT = "SECONDS"
+  val CSV_DEFAULT_DIR = "/tmp/"
+
+  val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
+    case Some(s) => s.toInt
+    case None => CSV_DEFAULT_PERIOD
+  }
+
+  val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
+    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+    case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
+  }
+  
+  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+  val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
+    case Some(s) => s
+    case None => CSV_DEFAULT_DIR
+  }
+
+  val reporter: CsvReporter = CsvReporter.forRegistry(registry)
+      .formatFor(Locale.US)
+      .convertDurationsTo(TimeUnit.MILLISECONDS)
+      .convertRatesTo(TimeUnit.SECONDS)
+      .build(new File(pollDir))
+
+  override def start() {
+    reporter.start(pollPeriod, pollUnit)
+  }
+
+  override def stop() {
+    reporter.stop()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
new file mode 100644
index 0000000..621d086
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import com.codahale.metrics.{JmxReporter, MetricRegistry}
+
+import java.util.Properties
+
+class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+  val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
+
+  override def start() {
+    reporter.start()
+  }
+
+  override def stop() {
+    reporter.stop()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
new file mode 100644
index 0000000..4e90dd4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.json.MetricsModule
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import org.apache.spark.ui.JettyUtils
+
+class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
+  val SERVLET_KEY_URI = "uri"
+  val SERVLET_KEY_SAMPLE = "sample"
+
+  val servletURI = property.getProperty(SERVLET_KEY_URI)
+
+  val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
+
+  val mapper = new ObjectMapper().registerModule(
+    new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
+
+  def getHandlers = Array[(String, Handler)](
+    (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
+  )
+
+  def getMetricsSnapshot(request: HttpServletRequest): String = {
+    mapper.writeValueAsString(registry)
+  }
+
+  override def start() { }
+
+  override def stop() { }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
new file mode 100644
index 0000000..3a739aa
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+trait Sink {
+  def start: Unit
+  def stop: Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala b/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala
new file mode 100644
index 0000000..75cb2b8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
+
+class JvmSource extends Source {
+  val sourceName = "jvm"
+  val metricRegistry = new MetricRegistry()
+
+  val gcMetricSet = new GarbageCollectorMetricSet
+  val memGaugeSet = new MemoryUsageGaugeSet
+
+  metricRegistry.registerAll(gcMetricSet)
+  metricRegistry.registerAll(memGaugeSet)
+}