You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/19 21:17:13 UTC
[4/6] [SPARK-1132] Persisting Web UI through refactoring the
SparkListener interface
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 28b019d..06b041e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -45,8 +45,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
private[spark] class CoarseMesosSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
- master: String,
- appName: String)
+ master: String)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with MScheduler
with Logging {
@@ -94,7 +93,7 @@ private[spark] class CoarseMesosSchedulerBackend(
setDaemon(true)
override def run() {
val scheduler = CoarseMesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+ val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(sc.appName).build()
driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
try { {
val ret = driver.run()
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index bcf0ce1..4092dd0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -41,8 +41,7 @@ import org.apache.spark.util.Utils
private[spark] class MesosSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
- master: String,
- appName: String)
+ master: String)
extends SchedulerBackend
with MScheduler
with Logging {
@@ -71,7 +70,7 @@ private[spark] class MesosSchedulerBackend(
setDaemon(true)
override def run() {
val scheduler = MesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+ val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(sc.appName).build()
driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
try {
val ret = driver.run()
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1bf3f4d..71584b6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,7 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
import sun.nio.ch.DirectBuffer
-import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException, SecurityManager}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
@@ -92,7 +92,7 @@ private[spark] class BlockManager(
val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
- // Pending reregistration action being executed asynchronously or null if none
+ // Pending re-registration action being executed asynchronously or null if none
// is pending. Accesses should synchronize on asyncReregisterLock.
var asyncReregisterTask: Future[Unit] = null
val asyncReregisterLock = new Object
@@ -122,10 +122,15 @@ private[spark] class BlockManager(
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
- def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
- serializer: Serializer, conf: SparkConf, securityManager: SecurityManager) = {
- this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf,
- securityManager)
+ def this(
+ execId: String,
+ actorSystem: ActorSystem,
+ master: BlockManagerMaster,
+ serializer: Serializer,
+ conf: SparkConf,
+ securityManager: SecurityManager) = {
+ this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
+ conf, securityManager)
}
/**
@@ -148,14 +153,15 @@ private[spark] class BlockManager(
* an executor crash.
*
* This function deliberately fails silently if the master returns false (indicating that
- * the slave needs to reregister). The error condition will be detected again by the next
- * heart beat attempt or new block registration and another try to reregister all blocks
+ * the slave needs to re-register). The error condition will be detected again by the next
+ * heart beat attempt or new block registration and another try to re-register all blocks
* will be made then.
*/
private def reportAllBlocks() {
logInfo("Reporting " + blockInfo.size + " blocks to the master.")
for ((blockId, info) <- blockInfo) {
- if (!tryToReportBlockStatus(blockId, info)) {
+ val status = getCurrentBlockStatus(blockId, info)
+ if (!tryToReportBlockStatus(blockId, info, status)) {
logError("Failed to report " + blockId + " to master; giving up.")
return
}
@@ -163,20 +169,20 @@ private[spark] class BlockManager(
}
/**
- * Reregister with the master and report all blocks to it. This will be called by the heart beat
+ * Re-register with the master and report all blocks to it. This will be called by the heart beat
* thread if our heartbeat to the block manager indicates that we were not registered.
*
* Note that this method must be called without any BlockInfo locks held.
*/
def reregister() {
- // TODO: We might need to rate limit reregistering.
- logInfo("BlockManager reregistering with master")
+ // TODO: We might need to rate limit re-registering.
+ logInfo("BlockManager re-registering with master")
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
reportAllBlocks()
}
/**
- * Reregister with the master sometime soon.
+ * Re-register with the master sometime soon.
*/
def asyncReregister() {
asyncReregisterLock.synchronized {
@@ -192,7 +198,7 @@ private[spark] class BlockManager(
}
/**
- * For testing. Wait for any pending asynchronous reregistration; otherwise, do nothing.
+ * For testing. Wait for any pending asynchronous re-registration; otherwise, do nothing.
*/
def waitForAsyncReregister() {
val task = asyncReregisterTask
@@ -211,15 +217,19 @@ private[spark] class BlockManager(
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*
- * droppedMemorySize exists to account for when block is dropped from memory to disk (so it
- * is still valid). This ensures that update in master will compensate for the increase in
+ * droppedMemorySize exists to account for when the block is dropped from memory to disk (so
+ * it is still valid). This ensures that update in master will compensate for the increase in
* memory on slave.
*/
- def reportBlockStatus(blockId: BlockId, info: BlockInfo, droppedMemorySize: Long = 0L) {
- val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize)
+ def reportBlockStatus(
+ blockId: BlockId,
+ info: BlockInfo,
+ status: BlockStatus,
+ droppedMemorySize: Long = 0L) {
+ val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
if (needReregister) {
- logInfo("Got told to reregister updating block " + blockId)
- // Reregistering will report our new block for free.
+ logInfo("Got told to re-register updating block " + blockId)
+ // Re-registering will report our new block for free.
asyncReregister()
}
logDebug("Told master about block " + blockId)
@@ -230,27 +240,41 @@ private[spark] class BlockManager(
* which will be true if the block was successfully recorded and false if
* the slave needs to re-register.
*/
- private def tryToReportBlockStatus(blockId: BlockId, info: BlockInfo,
+ private def tryToReportBlockStatus(
+ blockId: BlockId,
+ info: BlockInfo,
+ status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
- val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
+ if (info.tellMaster) {
+ val storageLevel = status.storageLevel
+ val inMemSize = Math.max(status.memSize, droppedMemorySize)
+ val onDiskSize = status.diskSize
+ master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
+ } else true
+ }
+
+ /**
+ * Return the updated storage status of the block with the given ID. More specifically, if
+ * the block is dropped from memory and possibly added to disk, return the new storage level
+ * and the updated in-memory and on-disk sizes.
+ */
+ private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
+ val (newLevel, inMemSize, onDiskSize) = info.synchronized {
info.level match {
case null =>
- (StorageLevel.NONE, 0L, 0L, false)
+ (StorageLevel.NONE, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
- val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication)
- val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize
+ val deserialized = if (inMem) level.deserialized else false
+ val replication = if (inMem || onDisk) level.replication else 1
+ val storageLevel = StorageLevel(onDisk, inMem, deserialized, replication)
+ val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
- (storageLevel, memSize, diskSize, info.tellMaster)
+ (storageLevel, memSize, diskSize)
}
}
-
- if (tellMaster) {
- master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
- } else {
- true
- }
+ BlockStatus(newLevel, inMemSize, onDiskSize)
}
/**
@@ -398,10 +422,10 @@ private[spark] class BlockManager(
/**
* Get block from remote block managers as serialized bytes.
*/
- def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+ def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
logDebug("Getting remote block " + blockId + " as bytes")
doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
- }
+ }
private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
@@ -447,9 +471,8 @@ private[spark] class BlockManager(
* so that we can control the maxMegabytesInFlight for the fetch.
*/
def getMultiple(
- blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], serializer: Serializer)
- : BlockFetcherIterator = {
-
+ blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
+ serializer: Serializer): BlockFetcherIterator = {
val iter =
if (conf.getBoolean("spark.shuffle.use.netty", false)) {
new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -461,8 +484,11 @@ private[spark] class BlockManager(
iter
}
- def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
- : Long = {
+ def put(
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel,
+ tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
doPut(blockId, IteratorValues(values), level, tellMaster)
}
@@ -472,41 +498,58 @@ private[spark] class BlockManager(
* This is currently used for writing shuffle files out. Callers should handle error
* cases.
*/
- def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
- : BlockObjectWriter = {
+ def getDiskWriter(
+ blockId: BlockId,
+ file: File,
+ serializer: Serializer,
+ bufferSize: Int): BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
}
/**
- * Put a new block of values to the block manager. Returns its (estimated) size in bytes.
+ * Put a new block of values to the block manager. Return a list of blocks updated as a
+ * result of this put.
*/
- def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
- tellMaster: Boolean = true) : Long = {
+ def put(
+ blockId: BlockId,
+ values: ArrayBuffer[Any],
+ level: StorageLevel,
+ tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, ArrayBufferValues(values), level, tellMaster)
}
/**
- * Put a new block of serialized bytes to the block manager.
+ * Put a new block of serialized bytes to the block manager. Return a list of blocks updated
+ * as a result of this put.
*/
- def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel,
- tellMaster: Boolean = true) {
+ def putBytes(
+ blockId: BlockId,
+ bytes: ByteBuffer,
+ level: StorageLevel,
+ tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
require(bytes != null, "Bytes is null")
doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
}
- private def doPut(blockId: BlockId,
- data: Values,
- level: StorageLevel, tellMaster: Boolean = true): Long = {
+ private def doPut(
+ blockId: BlockId,
+ data: Values,
+ level: StorageLevel,
+ tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
+ // Return value
+ val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
// Remember the block's storage level so that we can correctly drop it to disk if it needs
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
- val myInfo = {
+ val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
@@ -514,7 +557,7 @@ private[spark] class BlockManager(
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
- return oldBlockOpt.get.size
+ return updatedBlocks
}
// TODO: So the block info exists - but previous attempt to load it (?) failed.
@@ -536,7 +579,7 @@ private[spark] class BlockManager(
// Ditto for the bytes after the put
var bytesAfterPut: ByteBuffer = null
- // Size of the block in bytes (to return to caller)
+ // Size of the block in bytes
var size = 0L
// If we're storing bytes, then initiate the replication before storing them locally.
@@ -551,7 +594,7 @@ private[spark] class BlockManager(
null
}
- myInfo.synchronized {
+ putBlockInfo.synchronized {
logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
@@ -566,7 +609,7 @@ private[spark] class BlockManager(
case ArrayBufferValues(array) =>
memoryStore.putValues(blockId, array, level, true)
case ByteBufferValues(bytes) => {
- bytes.rewind();
+ bytes.rewind()
memoryStore.putBytes(blockId, bytes, level)
}
}
@@ -575,6 +618,8 @@ private[spark] class BlockManager(
case Right(newBytes) => bytesAfterPut = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
+ // Keep track of which blocks are dropped from memory
+ res.droppedBlocks.foreach { block => updatedBlocks += block }
} else {
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
@@ -586,7 +631,7 @@ private[spark] class BlockManager(
case ArrayBufferValues(array) =>
diskStore.putValues(blockId, array, level, askForBytes)
case ByteBufferValues(bytes) => {
- bytes.rewind();
+ bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
}
@@ -597,21 +642,25 @@ private[spark] class BlockManager(
}
}
- // Now that the block is in either the memory or disk store, let other threads read it,
- // and tell the master about it.
- marked = true
- myInfo.markReady(size)
- if (tellMaster) {
- reportBlockStatus(blockId, myInfo)
+ val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+ if (putBlockStatus.storageLevel != StorageLevel.NONE) {
+ // Now that the block is in either the memory or disk store, let other threads read it,
+ // and tell the master about it.
+ marked = true
+ putBlockInfo.markReady(size)
+ if (tellMaster) {
+ reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+ }
+ updatedBlocks += ((blockId, putBlockStatus))
}
} finally {
- // If we failed at putting the block to memory/disk, notify other possible readers
+ // If we failed in putting the block to memory/disk, notify other possible readers
// that it has failed, and then remove it from the block info map.
- if (! marked) {
+ if (!marked) {
// Note that the remove must happen before markFailure otherwise another thread
// could've inserted a new BlockInfo before we remove it.
blockInfo.remove(blockId)
- myInfo.markFailure()
+ putBlockInfo.markFailure()
logWarning("Putting block " + blockId + " failed")
}
}
@@ -650,7 +699,7 @@ private[spark] class BlockManager(
Utils.getUsedTimeMs(startTimeMs))
}
- size
+ updatedBlocks
}
/**
@@ -687,28 +736,42 @@ private[spark] class BlockManager(
/**
* Write a block consisting of a single object.
*/
- def putSingle(blockId: BlockId, value: Any, level: StorageLevel, tellMaster: Boolean = true) {
+ def putSingle(
+ blockId: BlockId,
+ value: Any,
+ level: StorageLevel,
+ tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
put(blockId, Iterator(value), level, tellMaster)
}
/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
+ *
+ * Return the block status if the given block has been updated, else None.
*/
- def dropFromMemory(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer]) {
+ def dropFromMemory(
+ blockId: BlockId,
+ data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
+
logInfo("Dropping block " + blockId + " from memory")
val info = blockInfo.get(blockId).orNull
+
+ // If the block has not already been dropped
if (info != null) {
info.synchronized {
// required ? As of now, this will be invoked only for blocks which are ready
// But in case this changes in future, adding for consistency sake.
- if (! info.waitForReady() ) {
+ if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
- return
+ return None
}
+ var blockIsUpdated = false
val level = info.level
+
+ // Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo("Writing block " + blockId + " to disk")
data match {
@@ -717,24 +780,33 @@ private[spark] class BlockManager(
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
+ blockIsUpdated = true
}
+
+ // Actually drop from memory store
val droppedMemorySize =
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
- val blockWasRemoved = memoryStore.remove(blockId)
- if (!blockWasRemoved) {
+ val blockIsRemoved = memoryStore.remove(blockId)
+ if (blockIsRemoved) {
+ blockIsUpdated = true
+ } else {
logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
}
+
+ val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
- reportBlockStatus(blockId, info, droppedMemorySize)
+ reportBlockStatus(blockId, info, status, droppedMemorySize)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
blockInfo.remove(blockId)
}
+ if (blockIsUpdated) {
+ return Some(status)
+ }
}
- } else {
- // The block has already been dropped
}
+ None
}
/**
@@ -766,7 +838,8 @@ private[spark] class BlockManager(
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
- reportBlockStatus(blockId, info)
+ val status = getCurrentBlockStatus(blockId, info)
+ reportBlockStatus(blockId, info, status)
}
} else {
// The block has already been removed; do nothing.
@@ -801,7 +874,8 @@ private[spark] class BlockManager(
iterator.remove()
logInfo("Dropped block " + id)
}
- reportBlockStatus(id, info)
+ val status = getCurrentBlockStatus(id, info)
+ reportBlockStatus(id, info, status)
}
}
}
@@ -911,9 +985,8 @@ private[spark] object BlockManager extends Logging {
def blockIdsToBlockManagers(
blockIds: Array[BlockId],
env: SparkEnv,
- blockManagerMaster: BlockManagerMaster = null)
- : Map[BlockId, Seq[BlockManagerId]] =
- {
+ blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = {
+
// blockManagerMaster != null is used in tests
assert (env != null || blockManagerMaster != null)
val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
@@ -932,18 +1005,14 @@ private[spark] object BlockManager extends Logging {
def blockIdsToExecutorIds(
blockIds: Array[BlockId],
env: SparkEnv,
- blockManagerMaster: BlockManagerMaster = null)
- : Map[BlockId, Seq[String]] =
- {
+ blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = {
blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.executorId))
}
def blockIdsToHosts(
blockIds: Array[BlockId],
env: SparkEnv,
- blockManagerMaster: BlockManagerMaster = null)
- : Map[BlockId, Seq[String]] =
- {
+ blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = {
blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index 98cd6e6..be537d7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -50,7 +50,6 @@ private[spark] class BlockManagerId private (
// DEBUG code
Utils.checkHost(host)
assert (port > 0)
-
host + ":" + port
}
@@ -93,7 +92,7 @@ private[spark] class BlockManagerId private (
private[spark] object BlockManagerId {
/**
- * Returns a [[org.apache.spark.storage.BlockManagerId]] for the given configuraiton.
+ * Returns a [[org.apache.spark.storage.BlockManagerId]] for the given configuration.
*
* @param execId ID of the executor.
* @param host Host name of the block manager.
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e531467..ed69378 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -28,8 +28,7 @@ import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark]
-class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging {
-
+class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3)
val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000)
@@ -53,8 +52,7 @@ class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Lo
}
/** Register the BlockManager's id with the driver. */
- def registerBlockManager(
- blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
logInfo("Trying to register BlockManager")
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
logInfo("Registered BlockManager")
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index a999d76..ff2652b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -28,6 +28,7 @@ import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask
import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -36,11 +37,11 @@ import org.apache.spark.util.{AkkaUtils, Utils}
* all slaves' block managers.
*/
private[spark]
-class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Actor with Logging {
+class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus: LiveListenerBus)
+ extends Actor with Logging {
// Mapping from block manager id to the block manager's information.
- private val blockManagerInfo =
- new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
+ private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
// Mapping from executor ID to block manager ID.
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
@@ -160,6 +161,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
blockLocations.remove(locations)
}
}
+ listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
}
private def expireDeadHosts() {
@@ -217,8 +219,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case(blockManagerId, info) =>
- import collection.JavaConverters._
- StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap)
+ val blockMap = mutable.Map[BlockId, BlockStatus](info.blocks.toSeq: _*)
+ new StorageStatus(blockManagerId, info.maxMem, blockMap)
}.toArray
}
@@ -233,9 +235,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
case None =>
blockManagerIdByExecutor(id.executorId) = id
}
- blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
- id, System.currentTimeMillis(), maxMemSize, slaveActor)
+ blockManagerInfo(id) =
+ new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
}
+ listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
}
private def updateBlockInfo(
@@ -307,97 +310,96 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
}
-private[spark]
-object BlockManagerMasterActor {
-
- case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
- class BlockManagerInfo(
- val blockManagerId: BlockManagerId,
- timeMs: Long,
- val maxMem: Long,
- val slaveActor: ActorRef)
- extends Logging {
+private[spark] class BlockManagerInfo(
+ val blockManagerId: BlockManagerId,
+ timeMs: Long,
+ val maxMem: Long,
+ val slaveActor: ActorRef)
+ extends Logging {
- private var _lastSeenMs: Long = timeMs
- private var _remainingMem: Long = maxMem
+ private var _lastSeenMs: Long = timeMs
+ private var _remainingMem: Long = maxMem
- // Mapping from block id to its status.
- private val _blocks = new JHashMap[BlockId, BlockStatus]
+ // Mapping from block id to its status.
+ private val _blocks = new JHashMap[BlockId, BlockStatus]
- logInfo("Registering block manager %s with %s RAM".format(
- blockManagerId.hostPort, Utils.bytesToString(maxMem)))
+ logInfo("Registering block manager %s with %s RAM".format(
+ blockManagerId.hostPort, Utils.bytesToString(maxMem)))
- def updateLastSeenMs() {
- _lastSeenMs = System.currentTimeMillis()
- }
+ def updateLastSeenMs() {
+ _lastSeenMs = System.currentTimeMillis()
+ }
- def updateBlockInfo(blockId: BlockId, storageLevel: StorageLevel, memSize: Long,
- diskSize: Long) {
+ def updateBlockInfo(
+ blockId: BlockId,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long) {
- updateLastSeenMs()
+ updateLastSeenMs()
- if (_blocks.containsKey(blockId)) {
- // The block exists on the slave already.
- val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
+ if (_blocks.containsKey(blockId)) {
+ // The block exists on the slave already.
+ val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
- if (originalLevel.useMemory) {
- _remainingMem += memSize
- }
+ if (originalLevel.useMemory) {
+ _remainingMem += memSize
}
+ }
- if (storageLevel.isValid) {
- // isValid means it is either stored in-memory or on-disk.
- // But the memSize here indicates the data size in or dropped from memory,
- // and the diskSize here indicates the data size in or dropped to disk.
- // They can be both larger than 0, when a block is dropped from memory to disk.
- // Therefore, a safe way to set BlockStatus is to set its info in accurate modes.
- if (storageLevel.useMemory) {
- _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0))
- _remainingMem -= memSize
- logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
- Utils.bytesToString(_remainingMem)))
- }
- if (storageLevel.useDisk) {
- _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize))
- logInfo("Added %s on disk on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
- }
- } else if (_blocks.containsKey(blockId)) {
- // If isValid is not true, drop the block.
- val blockStatus: BlockStatus = _blocks.get(blockId)
- _blocks.remove(blockId)
- if (blockStatus.storageLevel.useMemory) {
- _remainingMem += blockStatus.memSize
- logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
- Utils.bytesToString(_remainingMem)))
- }
- if (blockStatus.storageLevel.useDisk) {
- logInfo("Removed %s on %s on disk (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
- }
+ if (storageLevel.isValid) {
+ /* isValid means it is either stored in-memory or on-disk.
+ * But the memSize here indicates the data size in or dropped from memory,
+ * and the diskSize here indicates the data size in or dropped to disk.
+ * They can be both larger than 0, when a block is dropped from memory to disk.
+ * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
+ if (storageLevel.useMemory) {
+ _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0))
+ _remainingMem -= memSize
+ logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
+ blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
+ Utils.bytesToString(_remainingMem)))
+ }
+ if (storageLevel.useDisk) {
+ _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize))
+ logInfo("Added %s on disk on %s (size: %s)".format(
+ blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
+ }
+ } else if (_blocks.containsKey(blockId)) {
+ // If isValid is not true, drop the block.
+ val blockStatus: BlockStatus = _blocks.get(blockId)
+ _blocks.remove(blockId)
+ if (blockStatus.storageLevel.useMemory) {
+ _remainingMem += blockStatus.memSize
+ logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
+ blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
+ Utils.bytesToString(_remainingMem)))
+ }
+ if (blockStatus.storageLevel.useDisk) {
+ logInfo("Removed %s on %s on disk (size: %s)".format(
+ blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
}
+ }
- def removeBlock(blockId: BlockId) {
- if (_blocks.containsKey(blockId)) {
- _remainingMem += _blocks.get(blockId).memSize
- _blocks.remove(blockId)
- }
+ def removeBlock(blockId: BlockId) {
+ if (_blocks.containsKey(blockId)) {
+ _remainingMem += _blocks.get(blockId).memSize
+ _blocks.remove(blockId)
}
+ }
- def remainingMem: Long = _remainingMem
+ def remainingMem: Long = _remainingMem
- def lastSeenMs: Long = _lastSeenMs
+ def lastSeenMs: Long = _lastSeenMs
- def blocks: JHashMap[BlockId, BlockStatus] = _blocks
+ def blocks: JHashMap[BlockId, BlockStatus] = _blocks
- override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
+ override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
- def clear() {
- _blocks.clear()
- }
+ def clear() {
+ _blocks.clear()
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 38836d4..488f1ea 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
+ override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
@@ -70,16 +70,15 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
blockId: BlockId,
values: ArrayBuffer[Any],
level: StorageLevel,
- returnValues: Boolean)
- : PutResult = {
+ returnValues: Boolean): PutResult = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
- tryToPut(blockId, values, sizeEstimate, true)
- PutResult(sizeEstimate, Left(values.toIterator))
+ val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true)
+ PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks)
} else {
- val bytes = blockManager.dataSerialize(blockId, values.toIterator)
- tryToPut(blockId, bytes, bytes.limit, false)
- PutResult(bytes.limit(), Right(bytes.duplicate()))
+ val bytes = blockManager.dataSerialize(blockId, values.iterator)
+ val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
}
}
@@ -87,20 +86,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
- returnValues: Boolean)
- : PutResult = {
-
- if (level.deserialized) {
- val valueEntries = new ArrayBuffer[Any]()
- valueEntries ++= values
- val sizeEstimate = SizeEstimator.estimate(valueEntries.asInstanceOf[AnyRef])
- tryToPut(blockId, valueEntries, sizeEstimate, true)
- PutResult(sizeEstimate, Left(valueEntries.toIterator))
- } else {
- val bytes = blockManager.dataSerialize(blockId, values)
- tryToPut(blockId, bytes, bytes.limit, false)
- PutResult(bytes.limit(), Right(bytes.duplicate()))
- }
+ returnValues: Boolean): PutResult = {
+ val valueEntries = new ArrayBuffer[Any]()
+ valueEntries ++= values
+ putValues(blockId, valueEntries, level, returnValues)
}
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -164,19 +153,34 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
* size must also be passed by the caller.
*
- * Locks on the object putLock to ensure that all the put requests and its associated block
+ * Lock on the object putLock to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
+ *
+ * Return whether put was successful, along with the blocks dropped in the process.
*/
- private def tryToPut(blockId: BlockId, value: Any, size: Long, deserialized: Boolean): Boolean = {
- // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
- // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
- // released, it must be ensured that those to-be-dropped blocks are not double counted for
- // freeing up more space for another block that needs to be put. Only then the actually dropping
- // of blocks (and writing to disk if necessary) can proceed in parallel.
+ private def tryToPut(
+ blockId: BlockId,
+ value: Any,
+ size: Long,
+ deserialized: Boolean): ResultWithDroppedBlocks = {
+
+ /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
+ * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
+ * been released, it must be ensured that those to-be-dropped blocks are not double counted
+ * for freeing up more space for another block that needs to be put. Only then the actually
+ * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
+
+ var putSuccess = false
+ val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
putLock.synchronized {
- if (ensureFreeSpace(blockId, size)) {
+ val freeSpaceResult = ensureFreeSpace(blockId, size)
+ val enoughFreeSpace = freeSpaceResult.success
+ droppedBlocks ++= freeSpaceResult.droppedBlocks
+
+ if (enoughFreeSpace) {
val entry = new Entry(value, size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
@@ -189,7 +193,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
}
- true
+ putSuccess = true
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
@@ -198,29 +202,33 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
- blockManager.dropFromMemory(blockId, data)
- false
+ val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
+ droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
}
+ ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}
/**
- * Tries to free up a given amount of space to store a particular block, but can fail and return
- * false if either the block is bigger than our memory or it would require replacing another
- * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
+ * Try to free up a given amount of space to store a particular block, but can fail if
+ * either the block is bigger than our memory or it would require replacing another block
+ * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
- * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
+ * Assume that a lock is held by the caller to ensure only one thread is dropping blocks.
* Otherwise, the freed space may fill up before the caller puts in their new value.
+ *
+ * Return whether there is enough free space, along with the blocks dropped in the process.
*/
- private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): Boolean = {
-
+ private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
+ val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
if (space > maxMemory) {
logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
- return false
+ return ResultWithDroppedBlocks(success = false, droppedBlocks)
}
if (maxMemory - currentMemory < space) {
@@ -256,17 +264,18 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
- blockManager.dropFromMemory(blockId, data)
+ val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
+ droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
}
- return true
+ return ResultWithDroppedBlocks(success = true, droppedBlocks)
} else {
logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " +
"from the same RDD")
- return false
+ return ResultWithDroppedBlocks(success = false, droppedBlocks)
}
}
- true
+ ResultWithDroppedBlocks(success = true, droppedBlocks)
}
override def contains(blockId: BlockId): Boolean = {
@@ -274,3 +283,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
+private case class ResultWithDroppedBlocks(
+ success: Boolean,
+ droppedBlocks: Seq[(BlockId, BlockStatus)])
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/PutResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala
index 2eba2f0..f0eac75 100644
--- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala
+++ b/core/src/main/scala/org/apache/spark/storage/PutResult.scala
@@ -20,7 +20,13 @@ package org.apache.spark.storage
import java.nio.ByteBuffer
/**
- * Result of adding a block into a BlockStore. Contains its estimated size, and possibly the
- * values put if the caller asked for them to be returned (e.g. for chaining replication)
+ * Result of adding a block into a BlockStore. This case class contains a few things:
+ * (1) The estimated size of the put,
+ * (2) The values put if the caller asked for them to be returned (e.g. for chaining
+ * replication), and
+ * (3) A list of blocks dropped as a result of this put. This is always empty for DiskStore.
*/
-private[spark] case class PutResult(size: Long, data: Either[Iterator[_], ByteBuffer])
+private[spark] case class PutResult(
+ size: Long,
+ data: Either[Iterator[_], ByteBuffer],
+ droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty)
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
new file mode 100644
index 0000000..26565f5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.collection.mutable
+
+import org.apache.spark.scheduler._
+
+/**
+ * A SparkListener that maintains executor storage status
+ */
+private[spark] class StorageStatusListener extends SparkListener {
+ private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
+
+ def storageStatusList = executorIdToStorageStatus.values.toSeq
+
+ /** Update storage status list to reflect updated block statuses */
+ def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
+ val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId)
+ filteredStatus.foreach { storageStatus =>
+ updatedBlocks.foreach { case (blockId, updatedStatus) =>
+ storageStatus.blocks(blockId) = updatedStatus
+ }
+ }
+ }
+
+ /** Update storage status list to reflect the removal of an RDD from the cache */
+ def updateStorageStatus(unpersistedRDDId: Int) {
+ storageStatusList.foreach { storageStatus =>
+ val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
+ unpersistedBlocksIds.foreach { blockId =>
+ storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L)
+ }
+ }
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ val info = taskEnd.taskInfo
+ val metrics = taskEnd.taskMetrics
+ if (info != null && metrics != null) {
+ val execId = formatExecutorId(info.executorId)
+ val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+ if (updatedBlocks.length > 0) {
+ updateStorageStatus(execId, updatedBlocks)
+ }
+ }
+ }
+
+ override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
+ updateStorageStatus(unpersistRDD.rddId)
+ }
+
+ override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) {
+ synchronized {
+ val blockManagerId = blockManagerAdded.blockManagerId
+ val executorId = blockManagerId.executorId
+ val maxMem = blockManagerAdded.maxMem
+ val storageStatus = new StorageStatus(blockManagerId, maxMem)
+ executorIdToStorageStatus(executorId) = storageStatus
+ }
+ }
+
+ override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
+ synchronized {
+ val executorId = blockManagerRemoved.blockManagerId.executorId
+ executorIdToStorageStatus.remove(executorId)
+ }
+ }
+
+ /**
+ * In the local mode, there is a discrepancy between the executor ID according to the
+ * task ("localhost") and that according to SparkEnv ("<driver>"). In the UI, this
+ * results in duplicate rows for the same executor. Thus, in this mode, we aggregate
+ * these two rows and use the executor ID of "<driver>" to be consistent.
+ */
+ def formatExecutorId(execId: String): String = {
+ if (execId == "localhost") "<driver>" else execId
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 2d88a40..6153dfe 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -17,13 +17,17 @@
package org.apache.spark.storage
+import scala.collection.Map
+import scala.collection.mutable
+
import org.apache.spark.SparkContext
-import org.apache.spark.storage.BlockManagerMasterActor.BlockStatus
import org.apache.spark.util.Utils
private[spark]
-case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
- blocks: Map[BlockId, BlockStatus]) {
+class StorageStatus(
+ val blockManagerId: BlockManagerId,
+ val maxMem: Long,
+ val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
def memUsed() = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
@@ -43,14 +47,18 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
}
}
-case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
- numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
+private[spark]
+class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storageLevel: StorageLevel)
extends Ordered[RDDInfo] {
+
+ var numCachedPartitions = 0
+ var memSize = 0L
+ var diskSize = 0L
+
override def toString = {
- import Utils.bytesToString
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
"DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions,
- numPartitions, bytesToString(memSize), bytesToString(diskSize))
+ numPartitions, Utils.bytesToString(memSize), Utils.bytesToString(diskSize))
}
override def compare(that: RDDInfo) = {
@@ -62,55 +70,76 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
private[spark]
object StorageUtils {
- /* Returns RDD-level information, compiled from a list of StorageStatus objects */
- def rddInfoFromStorageStatus(storageStatusList: Seq[StorageStatus],
- sc: SparkContext) : Array[RDDInfo] = {
- rddInfoFromBlockStatusList(
- storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus], sc)
+ /**
+ * Returns basic information of all RDDs persisted in the given SparkContext. This does not
+ * include storage information.
+ */
+ def rddInfoFromSparkContext(sc: SparkContext): Array[RDDInfo] = {
+ sc.persistentRdds.values.map { rdd =>
+ val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
+ val rddNumPartitions = rdd.partitions.size
+ val rddStorageLevel = rdd.getStorageLevel
+ val rddInfo = new RDDInfo(rdd.id, rddName, rddNumPartitions, rddStorageLevel)
+ rddInfo
+ }.toArray
}
- /* Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */
- def blockLocationsFromStorageStatus(storageStatusList: Seq[StorageStatus]) = {
- val blockLocationPairs = storageStatusList
- .flatMap(s => s.blocks.map(b => (b._1, s.blockManagerId.hostPort)))
- blockLocationPairs.groupBy(_._1).map{case (k, v) => (k, v.unzip._2)}.toMap
+ /** Returns storage information of all RDDs persisted in the given SparkContext. */
+ def rddInfoFromStorageStatus(
+ storageStatuses: Seq[StorageStatus],
+ sc: SparkContext): Array[RDDInfo] = {
+ rddInfoFromStorageStatus(storageStatuses, rddInfoFromSparkContext(sc))
}
- /* Given a list of BlockStatus objets, returns information for each RDD */
- def rddInfoFromBlockStatusList(infos: Map[RDDBlockId, BlockStatus],
- sc: SparkContext) : Array[RDDInfo] = {
+ /** Returns storage information of all RDDs in the given list. */
+ def rddInfoFromStorageStatus(
+ storageStatuses: Seq[StorageStatus],
+ rddInfos: Seq[RDDInfo]): Array[RDDInfo] = {
+
+ // Mapping from RDD ID -> an array of associated BlockStatuses
+ val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap
+ .groupBy { case (k, _) => k.rddId }
+ .mapValues(_.values.toArray)
- // Group by rddId, ignore the partition name
- val groupedRddBlocks = infos.groupBy { case(k, v) => k.rddId }.mapValues(_.values.toArray)
+ // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
+ val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
- // For each RDD, generate an RDDInfo object
- val rddInfos = groupedRddBlocks.map { case (rddId, rddBlocks) =>
+ val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) =>
// Add up memory and disk sizes
- val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
- val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _)
-
- // Get the friendly name and storage level for the RDD, if available
- sc.persistentRdds.get(rddId).map { r =>
- val rddName = Option(r.name).getOrElse(rddId.toString)
- val rddStorageLevel = r.getStorageLevel
- RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size,
- memSize, diskSize)
+ val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize > 0 }
+ val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
+ val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
+ rddInfoMap.get(rddId).map { rddInfo =>
+ rddInfo.numCachedPartitions = persistedBlocks.length
+ rddInfo.memSize = memSize
+ rddInfo.diskSize = diskSize
+ rddInfo
}
- }.flatten.toArray
+ }.toArray
- scala.util.Sorting.quickSort(rddInfos)
-
- rddInfos
+ scala.util.Sorting.quickSort(rddStorageInfos)
+ rddStorageInfos
}
- /* Filters storage status by a given RDD id. */
- def filterStorageStatusByRDD(storageStatusList: Array[StorageStatus], rddId: Int)
- : Array[StorageStatus] = {
-
- storageStatusList.map { status =>
- val newBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toMap[BlockId, BlockStatus]
- //val newRemainingMem = status.maxMem - newBlocks.values.map(_.memSize).reduce(_ + _)
- StorageStatus(status.blockManagerId, status.maxMem, newBlocks)
+ /** Returns a mapping from BlockId to the locations of the associated block. */
+ def blockLocationsFromStorageStatus(
+ storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = {
+ val blockLocationPairs = storageStatuses.flatMap { storageStatus =>
+ storageStatus.blocks.map { case (bid, _) => (bid, storageStatus.blockManagerId.hostPort) }
}
+ blockLocationPairs.toMap
+ .groupBy { case (blockId, _) => blockId }
+ .mapValues(_.values.toSeq)
+ }
+
+ /** Filters the given list of StorageStatus by the given RDD ID. */
+ def filterStorageStatusByRDD(
+ storageStatuses: Seq[StorageStatus],
+ rddId: Int): Array[StorageStatus] = {
+ storageStatuses.map { status =>
+ val filteredBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toSeq
+ val filteredBlockMap = mutable.Map[BlockId, BlockStatus](filteredBlocks: _*)
+ new StorageStatus(status.blockManagerId, status.maxMem, filteredBlockMap)
+ }.toArray
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 36f2a0f..226ed2a 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -22,9 +22,9 @@ import java.util.concurrent.ArrayBlockingQueue
import akka.actor._
import util.Random
-import org.apache.spark.SparkConf
-import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.KryoSerializer
/**
* This class tests the BlockManager and MemoryStore for thread safety and
@@ -97,7 +97,8 @@ private[spark] object ThreadingTest {
val conf = new SparkConf()
val serializer = new KryoSerializer(conf)
val blockManagerMaster = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+ conf)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf,
new SecurityManager(conf))
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index e0555ca..6e1736f 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -17,30 +17,29 @@
package org.apache.spark.ui
-import java.net.InetSocketAddress
-import java.net.URL
-import javax.servlet.http.{HttpServlet, HttpServletResponse, HttpServletRequest}
+import java.net.{InetSocketAddress, URL}
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
import scala.xml.Node
-import org.json4s.JValue
-import org.json4s.jackson.JsonMethods.{pretty, render}
-
import org.eclipse.jetty.server.{DispatcherType, Server}
-import org.eclipse.jetty.server.handler.HandlerList
-import org.eclipse.jetty.servlet.{DefaultServlet, FilterHolder, ServletContextHandler, ServletHolder}
+import org.eclipse.jetty.server.handler._
+import org.eclipse.jetty.servlet._
import org.eclipse.jetty.util.thread.QueuedThreadPool
+import org.json4s.JValue
+import org.json4s.jackson.JsonMethods.{pretty, render}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
-
-/** Utilities for launching a web server using Jetty's HTTP Server class */
+/**
+ * Utilities for launching a web server using Jetty's HTTP Server class
+ */
private[spark] object JettyUtils extends Logging {
+
// Base type for a function that returns something based on an HTTP request. Allows for
// implicit conversion from many types of functions to jetty Handlers.
-
type Responder[T] = HttpServletRequest => T
class ServletParams[T <% AnyRef](val responder: Responder[T],
@@ -57,62 +56,73 @@ private[spark] object JettyUtils extends Logging {
implicit def textResponderToServlet(responder: Responder[String]): ServletParams[String] =
new ServletParams(responder, "text/plain")
- def createServlet[T <% AnyRef](servletParams: ServletParams[T],
+ def createServlet[T <% AnyRef](
+ servletParams: ServletParams[T],
securityMgr: SecurityManager): HttpServlet = {
new HttpServlet {
- override def doGet(request: HttpServletRequest,
- response: HttpServletResponse) {
- if (securityMgr.checkUIViewPermissions(request.getRemoteUser())) {
+ override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
+ if (securityMgr.checkUIViewPermissions(request.getRemoteUser)) {
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
response.setStatus(HttpServletResponse.SC_OK)
val result = servletParams.responder(request)
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
- response.getWriter().println(servletParams.extractFn(result))
+ response.getWriter.println(servletParams.extractFn(result))
} else {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
- "User is not authorized to access this page.");
+ "User is not authorized to access this page.")
}
}
}
}
- def createServletHandler(path: String, servlet: HttpServlet): ServletContextHandler = {
- val contextHandler = new ServletContextHandler()
+ /** Create a context handler that responds to a request with the given path prefix */
+ def createServletHandler[T <% AnyRef](
+ path: String,
+ servletParams: ServletParams[T],
+ securityMgr: SecurityManager,
+ basePath: String = ""): ServletContextHandler = {
+ createServletHandler(path, createServlet(servletParams, securityMgr), basePath)
+ }
+
+ /** Create a context handler that responds to a request with the given path prefix */
+ def createServletHandler(
+ path: String,
+ servlet: HttpServlet,
+ basePath: String = ""): ServletContextHandler = {
+ val prefixedPath = attachPrefix(basePath, path)
+ val contextHandler = new ServletContextHandler
val holder = new ServletHolder(servlet)
- contextHandler.setContextPath(path)
+ contextHandler.setContextPath(prefixedPath)
contextHandler.addServlet(holder, "/")
contextHandler
}
- /** Creates a handler that always redirects the user to a given path */
- def createRedirectHandler(newPath: String, path: String): ServletContextHandler = {
+ /** Create a handler that always redirects the user to the given path */
+ def createRedirectHandler(
+ srcPath: String,
+ destPath: String,
+ basePath: String = ""): ServletContextHandler = {
+ val prefixedDestPath = attachPrefix(basePath, destPath)
val servlet = new HttpServlet {
- override def doGet(request: HttpServletRequest,
- response: HttpServletResponse) {
- // make sure we don't end up with // in the middle
- val newUri = new URL(new URL(request.getRequestURL.toString), newPath).toURI
- response.sendRedirect(newUri.toString)
+ override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
+ // Make sure we don't end up with "//" in the middle
+ val newUrl = new URL(new URL(request.getRequestURL.toString), prefixedDestPath).toString
+ response.sendRedirect(newUrl)
}
}
- val contextHandler = new ServletContextHandler()
- val holder = new ServletHolder(servlet)
- contextHandler.setContextPath(path)
- contextHandler.addServlet(holder, "/")
- contextHandler
+ createServletHandler(srcPath, servlet, basePath)
}
- /** Creates a handler for serving files from a static directory */
+ /** Create a handler for serving files from a static directory */
def createStaticHandler(resourceBase: String, path: String): ServletContextHandler = {
- val contextHandler = new ServletContextHandler()
+ val contextHandler = new ServletContextHandler
val staticHandler = new DefaultServlet
val holder = new ServletHolder(staticHandler)
Option(getClass.getClassLoader.getResource(resourceBase)) match {
case Some(res) =>
holder.setInitParameter("resourceBase", res.toString)
- holder.setInitParameter("welcomeServlets", "false")
- holder.setInitParameter("pathInfoOnly", "false")
case None =>
throw new Exception("Could not find resource path for Web UI: " + resourceBase)
}
@@ -121,6 +131,7 @@ private[spark] object JettyUtils extends Logging {
contextHandler
}
+ /** Add security filters, if any, do the given list of ServletContextHandlers */
private def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
filters.foreach {
@@ -129,7 +140,7 @@ private[spark] object JettyUtils extends Logging {
logInfo("Adding filter: " + filter)
val holder : FilterHolder = new FilterHolder()
holder.setClassName(filter)
- // get any parameters for each filter
+ // Get any parameters for each filter
val paramName = "spark." + filter + ".params"
val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
params.foreach {
@@ -147,18 +158,21 @@ private[spark] object JettyUtils extends Logging {
}
/**
- * Attempts to start a Jetty server at the supplied hostName:port which uses the supplied
- * handlers.
+ * Attempt to start a Jetty server bound to the supplied hostName:port using the given
+ * context handlers.
*
- * If the desired port number is contented, continues incrementing ports until a free port is
- * found. Returns the chosen port and the jetty Server object.
+ * If the desired port number is contended, continues incrementing ports until a free port is
+ * found. Return the jetty Server object, the chosen port, and a mutable collection of handlers.
*/
- def startJettyServer(hostName: String, port: Int, handlers: Seq[ServletContextHandler],
- conf: SparkConf): (Server, Int) = {
-
+ def startJettyServer(
+ hostName: String,
+ port: Int,
+ handlers: Seq[ServletContextHandler],
+ conf: SparkConf): ServerInfo = {
+
+ val collection = new ContextHandlerCollection
+ collection.setHandlers(handlers.toArray)
addFilters(handlers, conf)
- val handlerList = new HandlerList
- handlerList.setHandlers(handlers.toArray)
@tailrec
def connect(currentPort: Int): (Server, Int) = {
@@ -166,7 +180,7 @@ private[spark] object JettyUtils extends Logging {
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
- server.setHandler(handlerList)
+ server.setHandler(collection)
Try {
server.start()
@@ -181,6 +195,17 @@ private[spark] object JettyUtils extends Logging {
}
}
- connect(port)
+ val (server, boundPort) = connect(port)
+ ServerInfo(server, boundPort, collection)
+ }
+
+ /** Attach a prefix to the given path, but avoid returning an empty path */
+ private def attachPrefix(basePath: String, relativePath: String): String = {
+ if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/")
}
}
+
+private[spark] case class ServerInfo(
+ server: Server,
+ boundPort: Int,
+ rootHandler: ContextHandlerCollection)
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 5f0dee6..fd638c8 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,12 +17,11 @@
package org.apache.spark.ui
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
-import org.apache.spark.{Logging, SparkContext, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.env.EnvironmentUI
import org.apache.spark.ui.exec.ExecutorsUI
@@ -31,34 +30,57 @@ import org.apache.spark.ui.storage.BlockManagerUI
import org.apache.spark.util.Utils
/** Top level user interface for Spark */
-private[spark] class SparkUI(sc: SparkContext) extends Logging {
- val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
- val port = sc.conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
- var boundPort: Option[Int] = None
- var server: Option[Server] = None
-
- val handlers = Seq[ServletContextHandler] (
- createStaticHandler(SparkUI.STATIC_RESOURCE_DIR + "/static", "/static"),
- createRedirectHandler("/stages", "/")
- )
- val storage = new BlockManagerUI(sc)
- val jobs = new JobProgressUI(sc)
- val env = new EnvironmentUI(sc)
- val exec = new ExecutorsUI(sc)
-
- // Add MetricsServlet handlers by default
- val metricsServletHandlers = SparkEnv.get.metricsSystem.getServletHandlers
-
- val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
- exec.getHandlers ++ metricsServletHandlers ++ handlers
+private[spark] class SparkUI(
+ val sc: SparkContext,
+ conf: SparkConf,
+ val listenerBus: SparkListenerBus,
+ val appName: String,
+ val basePath: String = "")
+ extends Logging {
+
+ def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName)
+ def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
+ this(null, conf, listenerBus, appName, basePath)
+
+ // If SparkContext is not provided, assume the associated application is not live
+ val live = sc != null
+
+ val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
+
+ private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
+ private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
+ private var serverInfo: Option[ServerInfo] = None
+
+ private val storage = new BlockManagerUI(this)
+ private val jobs = new JobProgressUI(this)
+ private val env = new EnvironmentUI(this)
+ private val exec = new ExecutorsUI(this)
+
+ val handlers: Seq[ServletContextHandler] = {
+ val metricsServletHandlers = if (live) {
+ SparkEnv.get.metricsSystem.getServletHandlers
+ } else {
+ Array[ServletContextHandler]()
+ }
+ storage.getHandlers ++
+ jobs.getHandlers ++
+ env.getHandlers ++
+ exec.getHandlers ++
+ metricsServletHandlers ++
+ Seq[ServletContextHandler] (
+ createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"),
+ createRedirectHandler("/", "/stages", basePath)
+ )
+ }
+
+ // Maintain executor storage status through Spark events
+ val storageStatusListener = new StorageStatusListener
/** Bind the HTTP server which backs this web interface */
def bind() {
try {
- val (srv, usedPort) = JettyUtils.startJettyServer(host, port, allHandlers, sc.conf)
- logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort))
- server = Some(srv)
- boundPort = Some(usedPort)
+ serverInfo = Some(startJettyServer(host, port, handlers, sc.conf))
+ logInfo("Started Spark Web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Spark JettyUtils", e)
@@ -66,25 +88,34 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
}
}
+ def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
+
/** Initialize all components of the server */
def start() {
- // NOTE: This is decoupled from bind() because of the following dependency cycle:
- // DAGScheduler() requires that the port of this server is known
- // This server must register all handlers, including JobProgressUI, before binding
- // JobProgressUI registers a listener with SparkContext, which requires sc to initialize
+ storage.start()
jobs.start()
+ env.start()
exec.start()
+
+ // Storage status listener must receive events first, as other listeners depend on its state
+ listenerBus.addListener(storageStatusListener)
+ listenerBus.addListener(storage.listener)
+ listenerBus.addListener(jobs.listener)
+ listenerBus.addListener(env.listener)
+ listenerBus.addListener(exec.listener)
}
def stop() {
- server.foreach(_.stop())
+ assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!")
+ serverInfo.get.server.stop()
+ logInfo("Stopped Spark Web UI at %s".format(appUIAddress))
}
- private[spark] def appUIAddress = host + ":" + boundPort.getOrElse("-1")
+ private[spark] def appUIAddress = "http://" + host + ":" + boundPort
}
private[spark] object SparkUI {
val DEFAULT_PORT = "4040"
- val STATIC_RESOURCE_DIR = "org/apache/spark/ui"
+ val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 547a194..a487924 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -19,38 +19,43 @@ package org.apache.spark.ui
import scala.xml.Node
-import org.apache.spark.SparkContext
-
/** Utility functions for generating XML pages with spark content. */
private[spark] object UIUtils {
+
import Page._
// Yarn has to go through a proxy so the base uri is provided and has to be on all links
private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
getOrElse("")
- def prependBaseUri(resource: String = "") = uiRoot + resource
+ def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
/** Returns a spark page with correctly formatted headers */
- def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
- : Seq[Node] = {
+ def headerSparkPage(
+ content: => Seq[Node],
+ basePath: String,
+ appName: String,
+ title: String,
+ page: Page.Value) : Seq[Node] = {
val jobs = page match {
- case Stages => <li class="active"><a href={prependBaseUri("/stages")}>Stages</a></li>
- case _ => <li><a href={prependBaseUri("/stages")}>Stages</a></li>
+ case Stages =>
+ <li class="active"><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
+ case _ => <li><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
}
val storage = page match {
- case Storage => <li class="active"><a href={prependBaseUri("/storage")}>Storage</a></li>
- case _ => <li><a href={prependBaseUri("/storage")}>Storage</a></li>
+ case Storage =>
+ <li class="active"><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
+ case _ => <li><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
}
val environment = page match {
case Environment =>
- <li class="active"><a href={prependBaseUri("/environment")}>Environment</a></li>
- case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li>
+ <li class="active"><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
+ case _ => <li><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
}
val executors = page match {
case Executors =>
- <li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li>
- case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li>
+ <li class="active"><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
+ case _ => <li><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
}
<html>
@@ -58,14 +63,15 @@ private[spark] object UIUtils {
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
<link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")}
type="text/css" />
- <link rel="stylesheet" href={prependBaseUri("/static/webui.css")} type="text/css" />
+ <link rel="stylesheet" href={prependBaseUri("/static/webui.css")}
+ type="text/css" />
<script src={prependBaseUri("/static/sorttable.js")} ></script>
- <title>{sc.appName} - {title}</title>
+ <title>{appName} - {title}</title>
</head>
<body>
<div class="navbar navbar-static-top">
<div class="navbar-inner">
- <a href={prependBaseUri("/")} class="brand">
+ <a href={prependBaseUri(basePath, "/")} class="brand">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
</a>
<ul class="nav">
@@ -74,7 +80,7 @@ private[spark] object UIUtils {
{environment}
{executors}
</ul>
- <p class="navbar-text pull-right"><strong>{sc.appName}</strong> application UI</p>
+ <p class="navbar-text pull-right"><strong>{appName}</strong> application UI</p>
</div>
</div>
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index 1433347..23e90c3 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -19,76 +19,74 @@ package org.apache.spark.ui.env
import javax.servlet.http.HttpServletRequest
-import scala.collection.JavaConversions._
-import scala.util.Properties
import scala.xml.Node
import org.eclipse.jetty.servlet.ServletContextHandler
-import org.apache.spark.SparkContext
+import org.apache.spark.scheduler._
+import org.apache.spark.ui._
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.Page.Environment
-import org.apache.spark.ui.UIUtils
-private[spark] class EnvironmentUI(sc: SparkContext) {
+private[ui] class EnvironmentUI(parent: SparkUI) {
+ private val appName = parent.appName
+ private val basePath = parent.basePath
+ private var _listener: Option[EnvironmentListener] = None
+
+ lazy val listener = _listener.get
+
+ def start() {
+ _listener = Some(new EnvironmentListener)
+ }
def getHandlers = Seq[ServletContextHandler](
createServletHandler("/environment",
- createServlet((request: HttpServletRequest) => envDetails(request), sc.env.securityManager))
+ (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
)
- def envDetails(request: HttpServletRequest): Seq[Node] = {
- val jvmInformation = Seq(
- ("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
- ("Java Home", Properties.javaHome),
- ("Scala Version", Properties.versionString),
- ("Scala Home", Properties.scalaHome)
- ).sorted
- def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
- def jvmTable =
- UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation, fixedWidth = true)
-
- val sparkProperties = sc.conf.getAll.sorted
-
- val systemProperties = System.getProperties.iterator.toSeq
- val classPathProperty = systemProperties.find { case (k, v) =>
- k == "java.class.path"
- }.getOrElse(("", ""))
- val otherProperties = systemProperties.filter { case (k, v) =>
- k != "java.class.path" && !k.startsWith("spark.")
- }.sorted
-
- val propertyHeaders = Seq("Name", "Value")
- def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
- val sparkPropertyTable =
- UIUtils.listingTable(propertyHeaders, propertyRow, sparkProperties, fixedWidth = true)
- val otherPropertyTable =
- UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true)
-
- val classPathEntries = classPathProperty._2
- .split(sc.conf.get("path.separator", ":"))
- .filterNot(e => e.isEmpty)
- .map(e => (e, "System Classpath"))
- val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
- val addedFiles = sc.addedFiles.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
- val classPath = (addedJars ++ addedFiles ++ classPathEntries).sorted
-
- val classPathHeaders = Seq("Resource", "Source")
- def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
- val classPathTable =
- UIUtils.listingTable(classPathHeaders, classPathRow, classPath, fixedWidth = true)
-
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val runtimeInformationTable = UIUtils.listingTable(
+ propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
+ val sparkPropertiesTable = UIUtils.listingTable(
+ propertyHeader, propertyRow, listener.sparkProperties, fixedWidth = true)
+ val systemPropertiesTable = UIUtils.listingTable(
+ propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
+ val classpathEntriesTable = UIUtils.listingTable(
+ classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
val content =
<span>
- <h4>Runtime Information</h4> {jvmTable}
- <h4>Spark Properties</h4>
- {sparkPropertyTable}
- <h4>System Properties</h4>
- {otherPropertyTable}
- <h4>Classpath Entries</h4>
- {classPathTable}
+ <h4>Runtime Information</h4> {runtimeInformationTable}
+ <h4>Spark Properties</h4> {sparkPropertiesTable}
+ <h4>System Properties</h4> {systemPropertiesTable}
+ <h4>Classpath Entries</h4> {classpathEntriesTable}
</span>
- UIUtils.headerSparkPage(content, sc, "Environment", Environment)
+ UIUtils.headerSparkPage(content, basePath, appName, "Environment", Environment)
+ }
+
+ private def propertyHeader = Seq("Name", "Value")
+ private def classPathHeaders = Seq("Resource", "Source")
+ private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+ private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+ private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the EnvironmentUI
+ */
+private[ui] class EnvironmentListener extends SparkListener {
+ var jvmInformation = Seq[(String, String)]()
+ var sparkProperties = Seq[(String, String)]()
+ var systemProperties = Seq[(String, String)]()
+ var classpathEntries = Seq[(String, String)]()
+
+ override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
+ synchronized {
+ val environmentDetails = environmentUpdate.environmentDetails
+ jvmInformation = environmentDetails("JVM Information")
+ sparkProperties = environmentDetails("Spark Properties")
+ systemProperties = environmentDetails("System Properties")
+ classpathEntries = environmentDetails("Classpath Entries")
+ }
}
}