You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:59 UTC
[15/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
deleted file mode 100644
index 2a6ec2a..0000000
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ /dev/null
@@ -1,1046 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.io.{InputStream, OutputStream}
-import java.nio.{ByteBuffer, MappedByteBuffer}
-
-import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
-
-import akka.actor.{ActorSystem, Cancellable, Props}
-import akka.dispatch.{Await, Future}
-import akka.util.Duration
-import akka.util.duration._
-
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import spark.{Logging, SparkEnv, SparkException, Utils}
-import spark.io.CompressionCodec
-import spark.network._
-import spark.serializer.Serializer
-import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
-
-import sun.nio.ch.DirectBuffer
-
-
-private[spark] class BlockManager(
- executorId: String,
- actorSystem: ActorSystem,
- val master: BlockManagerMaster,
- val defaultSerializer: Serializer,
- maxMemory: Long)
- extends Logging {
-
- private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
- @volatile var pending: Boolean = true
- @volatile var size: Long = -1L
- @volatile var initThread: Thread = null
- @volatile var failed = false
-
- setInitThread()
-
- private def setInitThread() {
- // Set current thread as init thread - waitForReady will not block this thread
- // (in case there is non trivial initialization which ends up calling waitForReady as part of
- // initialization itself)
- this.initThread = Thread.currentThread()
- }
-
- /**
- * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
- * Return true if the block is available, false otherwise.
- */
- def waitForReady(): Boolean = {
- if (initThread != Thread.currentThread() && pending) {
- synchronized {
- while (pending) this.wait()
- }
- }
- !failed
- }
-
- /** Mark this BlockInfo as ready (i.e. block is finished writing) */
- def markReady(sizeInBytes: Long) {
- assert (pending)
- size = sizeInBytes
- initThread = null
- failed = false
- initThread = null
- pending = false
- synchronized {
- this.notifyAll()
- }
- }
-
- /** Mark this BlockInfo as ready but failed */
- def markFailure() {
- assert (pending)
- size = 0
- initThread = null
- failed = true
- initThread = null
- pending = false
- synchronized {
- this.notifyAll()
- }
- }
- }
-
- val shuffleBlockManager = new ShuffleBlockManager(this)
-
- private val blockInfo = new TimeStampedHashMap[String, BlockInfo]
-
- private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
- private[storage] val diskStore: DiskStore =
- new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
-
- // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
- private val nettyPort: Int = {
- val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
- val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
- if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
- }
-
- val connectionManager = new ConnectionManager(0)
- implicit val futureExecContext = connectionManager.futureExecContext
-
- val blockManagerId = BlockManagerId(
- executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
-
- // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
- // for receiving shuffle outputs)
- val maxBytesInFlight =
- System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
-
- // Whether to compress broadcast variables that are stored
- val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
- // Whether to compress shuffle output that are stored
- val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
- // Whether to compress RDD partitions that are stored serialized
- val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
-
- val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
-
- val hostPort = Utils.localHostPort()
-
- val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
- name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
-
- // Pending reregistration action being executed asynchronously or null if none
- // is pending. Accesses should synchronize on asyncReregisterLock.
- var asyncReregisterTask: Future[Unit] = null
- val asyncReregisterLock = new Object
-
- private def heartBeat() {
- if (!master.sendHeartBeat(blockManagerId)) {
- reregister()
- }
- }
-
- var heartBeatTask: Cancellable = null
-
- val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
- initialize()
-
- // The compression codec to use. Note that the "lazy" val is necessary because we want to delay
- // the initialization of the compression codec until it is first used. The reason is that a Spark
- // program could be using a user-defined codec in a third party jar, which is loaded in
- // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
- // loaded yet.
- private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()
-
- /**
- * Construct a BlockManager with a memory limit set based on system properties.
- */
- def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
- serializer: Serializer) = {
- this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
- }
-
- /**
- * Initialize the BlockManager. Register to the BlockManagerMaster, and start the
- * BlockManagerWorker actor.
- */
- private def initialize() {
- master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
- BlockManagerWorker.startBlockManagerWorker(this)
- if (!BlockManager.getDisableHeartBeatsForTesting) {
- heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
- heartBeat()
- }
- }
- }
-
- /**
- * Report all blocks to the BlockManager again. This may be necessary if we are dropped
- * by the BlockManager and come back or if we become capable of recovering blocks on disk after
- * an executor crash.
- *
- * This function deliberately fails silently if the master returns false (indicating that
- * the slave needs to reregister). The error condition will be detected again by the next
- * heart beat attempt or new block registration and another try to reregister all blocks
- * will be made then.
- */
- private def reportAllBlocks() {
- logInfo("Reporting " + blockInfo.size + " blocks to the master.")
- for ((blockId, info) <- blockInfo) {
- if (!tryToReportBlockStatus(blockId, info)) {
- logError("Failed to report " + blockId + " to master; giving up.")
- return
- }
- }
- }
-
- /**
- * Reregister with the master and report all blocks to it. This will be called by the heart beat
- * thread if our heartbeat to the block amnager indicates that we were not registered.
- *
- * Note that this method must be called without any BlockInfo locks held.
- */
- def reregister() {
- // TODO: We might need to rate limit reregistering.
- logInfo("BlockManager reregistering with master")
- master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
- reportAllBlocks()
- }
-
- /**
- * Reregister with the master sometime soon.
- */
- def asyncReregister() {
- asyncReregisterLock.synchronized {
- if (asyncReregisterTask == null) {
- asyncReregisterTask = Future[Unit] {
- reregister()
- asyncReregisterLock.synchronized {
- asyncReregisterTask = null
- }
- }
- }
- }
- }
-
- /**
- * For testing. Wait for any pending asynchronous reregistration; otherwise, do nothing.
- */
- def waitForAsyncReregister() {
- val task = asyncReregisterTask
- if (task != null) {
- Await.ready(task, Duration.Inf)
- }
- }
-
- /**
- * Get storage level of local block. If no info exists for the block, then returns null.
- */
- def getLevel(blockId: String): StorageLevel = blockInfo.get(blockId).map(_.level).orNull
-
- /**
- * Tell the master about the current storage status of a block. This will send a block update
- * message reflecting the current status, *not* the desired storage level in its block info.
- * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
- *
- * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid).
- * This ensures that update in master will compensate for the increase in memory on slave.
- */
- def reportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L) {
- val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize)
- if (needReregister) {
- logInfo("Got told to reregister updating block " + blockId)
- // Reregistering will report our new block for free.
- asyncReregister()
- }
- logDebug("Told master about block " + blockId)
- }
-
- /**
- * Actually send a UpdateBlockInfo message. Returns the mater's response,
- * which will be true if the block was successfully recorded and false if
- * the slave needs to re-register.
- */
- private def tryToReportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = {
- val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
- info.level match {
- case null =>
- (StorageLevel.NONE, 0L, 0L, false)
- case level =>
- val inMem = level.useMemory && memoryStore.contains(blockId)
- val onDisk = level.useDisk && diskStore.contains(blockId)
- val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication)
- val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize
- val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
- (storageLevel, memSize, diskSize, info.tellMaster)
- }
- }
-
- if (tellMaster) {
- master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
- } else {
- true
- }
- }
-
- /**
- * Get locations of an array of blocks.
- */
- def getLocationBlockIds(blockIds: Array[String]): Array[Seq[BlockManagerId]] = {
- val startTimeMs = System.currentTimeMillis
- val locations = master.getLocations(blockIds).toArray
- logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
- locations
- }
-
- /**
- * A short-circuited method to get blocks directly from disk. This is used for getting
- * shuffle blocks. It is safe to do so without a lock on block info since disk store
- * never deletes (recent) items.
- */
- def getLocalFromDisk(blockId: String, serializer: Serializer): Option[Iterator[Any]] = {
- diskStore.getValues(blockId, serializer).orElse(
- sys.error("Block " + blockId + " not found on disk, though it should be"))
- }
-
- /**
- * Get block from local block manager.
- */
- def getLocal(blockId: String): Option[Iterator[Any]] = {
- logDebug("Getting local block " + blockId)
- val info = blockInfo.get(blockId).orNull
- if (info != null) {
- info.synchronized {
-
- // In the another thread is writing the block, wait for it to become ready.
- if (!info.waitForReady()) {
- // If we get here, the block write failed.
- logWarning("Block " + blockId + " was marked as failure.")
- return None
- }
-
- val level = info.level
- logDebug("Level for block " + blockId + " is " + level)
-
- // Look for the block in memory
- if (level.useMemory) {
- logDebug("Getting block " + blockId + " from memory")
- memoryStore.getValues(blockId) match {
- case Some(iterator) =>
- return Some(iterator)
- case None =>
- logDebug("Block " + blockId + " not found in memory")
- }
- }
-
- // Look for block on disk, potentially loading it back into memory if required
- if (level.useDisk) {
- logDebug("Getting block " + blockId + " from disk")
- if (level.useMemory && level.deserialized) {
- diskStore.getValues(blockId) match {
- case Some(iterator) =>
- // Put the block back in memory before returning it
- // TODO: Consider creating a putValues that also takes in a iterator ?
- val elements = new ArrayBuffer[Any]
- elements ++= iterator
- memoryStore.putValues(blockId, elements, level, true).data match {
- case Left(iterator2) =>
- return Some(iterator2)
- case _ =>
- throw new Exception("Memory store did not return back an iterator")
- }
- case None =>
- throw new Exception("Block " + blockId + " not found on disk, though it should be")
- }
- } else if (level.useMemory && !level.deserialized) {
- // Read it as a byte buffer into memory first, then return it
- diskStore.getBytes(blockId) match {
- case Some(bytes) =>
- // Put a copy of the block back in memory before returning it. Note that we can't
- // put the ByteBuffer returned by the disk store as that's a memory-mapped file.
- // The use of rewind assumes this.
- assert (0 == bytes.position())
- val copyForMemory = ByteBuffer.allocate(bytes.limit)
- copyForMemory.put(bytes)
- memoryStore.putBytes(blockId, copyForMemory, level)
- bytes.rewind()
- return Some(dataDeserialize(blockId, bytes))
- case None =>
- throw new Exception("Block " + blockId + " not found on disk, though it should be")
- }
- } else {
- diskStore.getValues(blockId) match {
- case Some(iterator) =>
- return Some(iterator)
- case None =>
- throw new Exception("Block " + blockId + " not found on disk, though it should be")
- }
- }
- }
- }
- } else {
- logDebug("Block " + blockId + " not registered locally")
- }
- return None
- }
-
- /**
- * Get block from the local block manager as serialized bytes.
- */
- def getLocalBytes(blockId: String): Option[ByteBuffer] = {
- // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow
- logDebug("Getting local block " + blockId + " as bytes")
-
- // As an optimization for map output fetches, if the block is for a shuffle, return it
- // without acquiring a lock; the disk store never deletes (recent) items so this should work
- if (ShuffleBlockManager.isShuffle(blockId)) {
- return diskStore.getBytes(blockId) match {
- case Some(bytes) =>
- Some(bytes)
- case None =>
- throw new Exception("Block " + blockId + " not found on disk, though it should be")
- }
- }
-
- val info = blockInfo.get(blockId).orNull
- if (info != null) {
- info.synchronized {
-
- // In the another thread is writing the block, wait for it to become ready.
- if (!info.waitForReady()) {
- // If we get here, the block write failed.
- logWarning("Block " + blockId + " was marked as failure.")
- return None
- }
-
- val level = info.level
- logDebug("Level for block " + blockId + " is " + level)
-
- // Look for the block in memory
- if (level.useMemory) {
- logDebug("Getting block " + blockId + " from memory")
- memoryStore.getBytes(blockId) match {
- case Some(bytes) =>
- return Some(bytes)
- case None =>
- logDebug("Block " + blockId + " not found in memory")
- }
- }
-
- // Look for block on disk
- if (level.useDisk) {
- // Read it as a byte buffer into memory first, then return it
- diskStore.getBytes(blockId) match {
- case Some(bytes) =>
- assert (0 == bytes.position())
- if (level.useMemory) {
- if (level.deserialized) {
- memoryStore.putBytes(blockId, bytes, level)
- } else {
- // The memory store will hang onto the ByteBuffer, so give it a copy instead of
- // the memory-mapped file buffer we got from the disk store
- val copyForMemory = ByteBuffer.allocate(bytes.limit)
- copyForMemory.put(bytes)
- memoryStore.putBytes(blockId, copyForMemory, level)
- }
- }
- bytes.rewind()
- return Some(bytes)
- case None =>
- throw new Exception("Block " + blockId + " not found on disk, though it should be")
- }
- }
- }
- } else {
- logDebug("Block " + blockId + " not registered locally")
- }
- return None
- }
-
- /**
- * Get block from remote block managers.
- */
- def getRemote(blockId: String): Option[Iterator[Any]] = {
- if (blockId == null) {
- throw new IllegalArgumentException("Block Id is null")
- }
- logDebug("Getting remote block " + blockId)
- // Get locations of block
- val locations = master.getLocations(blockId)
-
- // Get block from remote locations
- for (loc <- locations) {
- logDebug("Getting remote block " + blockId + " from " + loc)
- val data = BlockManagerWorker.syncGetBlock(
- GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
- if (data != null) {
- return Some(dataDeserialize(blockId, data))
- }
- logDebug("The value of block " + blockId + " is null")
- }
- logDebug("Block " + blockId + " not found")
- return None
- }
-
- /**
- * Get a block from the block manager (either local or remote).
- */
- def get(blockId: String): Option[Iterator[Any]] = {
- getLocal(blockId).orElse(getRemote(blockId))
- }
-
- /**
- * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns
- * an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined
- * fashion as they're received. Expects a size in bytes to be provided for each block fetched,
- * so that we can control the maxMegabytesInFlight for the fetch.
- */
- def getMultiple(
- blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])], serializer: Serializer)
- : BlockFetcherIterator = {
-
- val iter =
- if (System.getProperty("spark.shuffle.use.netty", "false").toBoolean) {
- new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
- } else {
- new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
- }
-
- iter.initialize()
- iter
- }
-
- def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
- : Long = {
- val elements = new ArrayBuffer[Any]
- elements ++= values
- put(blockId, elements, level, tellMaster)
- }
-
- /**
- * A short circuited method to get a block writer that can write data directly to disk.
- * This is currently used for writing shuffle files out. Callers should handle error
- * cases.
- */
- def getDiskBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int)
- : BlockObjectWriter = {
- val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize)
- writer.registerCloseEventHandler(() => {
- val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
- blockInfo.put(blockId, myInfo)
- myInfo.markReady(writer.size())
- })
- writer
- }
-
- /**
- * Put a new block of values to the block manager. Returns its (estimated) size in bytes.
- */
- def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
- tellMaster: Boolean = true) : Long = {
-
- if (blockId == null) {
- throw new IllegalArgumentException("Block Id is null")
- }
- if (values == null) {
- throw new IllegalArgumentException("Values is null")
- }
- if (level == null || !level.isValid) {
- throw new IllegalArgumentException("Storage level is null or invalid")
- }
-
- // Remember the block's storage level so that we can correctly drop it to disk if it needs
- // to be dropped right after it got put into memory. Note, however, that other threads will
- // not be able to get() this block until we call markReady on its BlockInfo.
- val myInfo = {
- val tinfo = new BlockInfo(level, tellMaster)
- // Do atomically !
- val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
-
- if (oldBlockOpt.isDefined) {
- if (oldBlockOpt.get.waitForReady()) {
- logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
- return oldBlockOpt.get.size
- }
-
- // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
- oldBlockOpt.get
- } else {
- tinfo
- }
- }
-
- val startTimeMs = System.currentTimeMillis
-
- // If we need to replicate the data, we'll want access to the values, but because our
- // put will read the whole iterator, there will be no values left. For the case where
- // the put serializes data, we'll remember the bytes, above; but for the case where it
- // doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
- var valuesAfterPut: Iterator[Any] = null
-
- // Ditto for the bytes after the put
- var bytesAfterPut: ByteBuffer = null
-
- // Size of the block in bytes (to return to caller)
- var size = 0L
-
- myInfo.synchronized {
- logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
- + " to get into synchronized block")
-
- var marked = false
- try {
- if (level.useMemory) {
- // Save it just to memory first, even if it also has useDisk set to true; we will later
- // drop it to disk if the memory store can't hold it.
- val res = memoryStore.putValues(blockId, values, level, true)
- size = res.size
- res.data match {
- case Right(newBytes) => bytesAfterPut = newBytes
- case Left(newIterator) => valuesAfterPut = newIterator
- }
- } else {
- // Save directly to disk.
- // Don't get back the bytes unless we replicate them.
- val askForBytes = level.replication > 1
- val res = diskStore.putValues(blockId, values, level, askForBytes)
- size = res.size
- res.data match {
- case Right(newBytes) => bytesAfterPut = newBytes
- case _ =>
- }
- }
-
- // Now that the block is in either the memory or disk store, let other threads read it,
- // and tell the master about it.
- marked = true
- myInfo.markReady(size)
- if (tellMaster) {
- reportBlockStatus(blockId, myInfo)
- }
- } finally {
- // If we failed at putting the block to memory/disk, notify other possible readers
- // that it has failed, and then remove it from the block info map.
- if (! marked) {
- // Note that the remove must happen before markFailure otherwise another thread
- // could've inserted a new BlockInfo before we remove it.
- blockInfo.remove(blockId)
- myInfo.markFailure()
- logWarning("Putting block " + blockId + " failed")
- }
- }
- }
- logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
-
- // Replicate block if required
- if (level.replication > 1) {
- val remoteStartTime = System.currentTimeMillis
- // Serialize the block if not already done
- if (bytesAfterPut == null) {
- if (valuesAfterPut == null) {
- throw new SparkException(
- "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
- }
- bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
- }
- replicate(blockId, bytesAfterPut, level)
- logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime))
- }
- BlockManager.dispose(bytesAfterPut)
-
- return size
- }
-
-
- /**
- * Put a new block of serialized bytes to the block manager.
- */
- def putBytes(
- blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
-
- if (blockId == null) {
- throw new IllegalArgumentException("Block Id is null")
- }
- if (bytes == null) {
- throw new IllegalArgumentException("Bytes is null")
- }
- if (level == null || !level.isValid) {
- throw new IllegalArgumentException("Storage level is null or invalid")
- }
-
- // Remember the block's storage level so that we can correctly drop it to disk if it needs
- // to be dropped right after it got put into memory. Note, however, that other threads will
- // not be able to get() this block until we call markReady on its BlockInfo.
- val myInfo = {
- val tinfo = new BlockInfo(level, tellMaster)
- // Do atomically !
- val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
-
- if (oldBlockOpt.isDefined) {
- if (oldBlockOpt.get.waitForReady()) {
- logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
- return
- }
-
- // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
- oldBlockOpt.get
- } else {
- tinfo
- }
- }
-
- val startTimeMs = System.currentTimeMillis
-
- // Initiate the replication before storing it locally. This is faster as
- // data is already serialized and ready for sending
- val replicationFuture = if (level.replication > 1) {
- val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper
- Future {
- replicate(blockId, bufferView, level)
- }
- } else {
- null
- }
-
- myInfo.synchronized {
- logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
- + " to get into synchronized block")
-
- var marked = false
- try {
- if (level.useMemory) {
- // Store it only in memory at first, even if useDisk is also set to true
- bytes.rewind()
- memoryStore.putBytes(blockId, bytes, level)
- } else {
- bytes.rewind()
- diskStore.putBytes(blockId, bytes, level)
- }
-
- // assert (0 == bytes.position(), "" + bytes)
-
- // Now that the block is in either the memory or disk store, let other threads read it,
- // and tell the master about it.
- marked = true
- myInfo.markReady(bytes.limit)
- if (tellMaster) {
- reportBlockStatus(blockId, myInfo)
- }
- } finally {
- // If we failed at putting the block to memory/disk, notify other possible readers
- // that it has failed, and then remove it from the block info map.
- if (! marked) {
- // Note that the remove must happen before markFailure otherwise another thread
- // could've inserted a new BlockInfo before we remove it.
- blockInfo.remove(blockId)
- myInfo.markFailure()
- logWarning("Putting block " + blockId + " failed")
- }
- }
- }
-
- // If replication had started, then wait for it to finish
- if (level.replication > 1) {
- Await.ready(replicationFuture, Duration.Inf)
- }
-
- if (level.replication > 1) {
- logDebug("PutBytes for block " + blockId + " with replication took " +
- Utils.getUsedTimeMs(startTimeMs))
- } else {
- logDebug("PutBytes for block " + blockId + " without replication took " +
- Utils.getUsedTimeMs(startTimeMs))
- }
- }
-
- /**
- * Replicate block to another node.
- */
- var cachedPeers: Seq[BlockManagerId] = null
- private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
- val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
- if (cachedPeers == null) {
- cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
- }
- for (peer: BlockManagerId <- cachedPeers) {
- val start = System.nanoTime
- data.rewind()
- logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is "
- + data.limit() + " Bytes. To node: " + peer)
- if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel),
- new ConnectionManagerId(peer.host, peer.port))) {
- logError("Failed to call syncPutBlock to " + peer)
- }
- logDebug("Replicated BlockId " + blockId + " once used " +
- (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
- data.limit() + " bytes.")
- }
- }
-
- /**
- * Read a block consisting of a single object.
- */
- def getSingle(blockId: String): Option[Any] = {
- get(blockId).map(_.next())
- }
-
- /**
- * Write a block consisting of a single object.
- */
- def putSingle(blockId: String, value: Any, level: StorageLevel, tellMaster: Boolean = true) {
- put(blockId, Iterator(value), level, tellMaster)
- }
-
- /**
- * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
- * store reaches its limit and needs to free up space.
- */
- def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
- logInfo("Dropping block " + blockId + " from memory")
- val info = blockInfo.get(blockId).orNull
- if (info != null) {
- info.synchronized {
- // required ? As of now, this will be invoked only for blocks which are ready
- // But in case this changes in future, adding for consistency sake.
- if (! info.waitForReady() ) {
- // If we get here, the block write failed.
- logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
- return
- }
-
- val level = info.level
- if (level.useDisk && !diskStore.contains(blockId)) {
- logInfo("Writing block " + blockId + " to disk")
- data match {
- case Left(elements) =>
- diskStore.putValues(blockId, elements, level, false)
- case Right(bytes) =>
- diskStore.putBytes(blockId, bytes, level)
- }
- }
- val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
- val blockWasRemoved = memoryStore.remove(blockId)
- if (!blockWasRemoved) {
- logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
- }
- if (info.tellMaster) {
- reportBlockStatus(blockId, info, droppedMemorySize)
- }
- if (!level.useDisk) {
- // The block is completely gone from this node; forget it so we can put() it again later.
- blockInfo.remove(blockId)
- }
- }
- } else {
- // The block has already been dropped
- }
- }
-
- /**
- * Remove all blocks belonging to the given RDD.
- * @return The number of blocks removed.
- */
- def removeRdd(rddId: Int): Int = {
- // TODO: Instead of doing a linear scan on the blockInfo map, create another map that maps
- // from RDD.id to blocks.
- logInfo("Removing RDD " + rddId)
- val rddPrefix = "rdd_" + rddId + "_"
- val blocksToRemove = blockInfo.filter(_._1.startsWith(rddPrefix)).map(_._1)
- blocksToRemove.foreach(blockId => removeBlock(blockId, false))
- blocksToRemove.size
- }
-
- /**
- * Remove a block from both memory and disk.
- */
- def removeBlock(blockId: String, tellMaster: Boolean = true) {
- logInfo("Removing block " + blockId)
- val info = blockInfo.get(blockId).orNull
- if (info != null) info.synchronized {
- // Removals are idempotent in disk store and memory store. At worst, we get a warning.
- val removedFromMemory = memoryStore.remove(blockId)
- val removedFromDisk = diskStore.remove(blockId)
- if (!removedFromMemory && !removedFromDisk) {
- logWarning("Block " + blockId + " could not be removed as it was not found in either " +
- "the disk or memory store")
- }
- blockInfo.remove(blockId)
- if (tellMaster && info.tellMaster) {
- reportBlockStatus(blockId, info)
- }
- } else {
- // The block has already been removed; do nothing.
- logWarning("Asked to remove block " + blockId + ", which does not exist")
- }
- }
-
- def dropOldBlocks(cleanupTime: Long) {
- logInfo("Dropping blocks older than " + cleanupTime)
- val iterator = blockInfo.internalMap.entrySet().iterator()
- while (iterator.hasNext) {
- val entry = iterator.next()
- val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
- if (time < cleanupTime) {
- info.synchronized {
- val level = info.level
- if (level.useMemory) {
- memoryStore.remove(id)
- }
- if (level.useDisk) {
- diskStore.remove(id)
- }
- iterator.remove()
- logInfo("Dropped block " + id)
- }
- reportBlockStatus(id, info)
- }
- }
- }
-
- def shouldCompress(blockId: String): Boolean = {
- if (ShuffleBlockManager.isShuffle(blockId)) {
- compressShuffle
- } else if (blockId.startsWith("broadcast_")) {
- compressBroadcast
- } else if (blockId.startsWith("rdd_")) {
- compressRdds
- } else {
- false // Won't happen in a real cluster, but it can in tests
- }
- }
-
- /**
- * Wrap an output stream for compression if block compression is enabled for its block type
- */
- def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
- if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
- }
-
- /**
- * Wrap an input stream for compression if block compression is enabled for its block type
- */
- def wrapForCompression(blockId: String, s: InputStream): InputStream = {
- if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
- }
-
- def dataSerialize(
- blockId: String,
- values: Iterator[Any],
- serializer: Serializer = defaultSerializer): ByteBuffer = {
- val byteStream = new FastByteArrayOutputStream(4096)
- val ser = serializer.newInstance()
- ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
- byteStream.trim()
- ByteBuffer.wrap(byteStream.array)
- }
-
- /**
- * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
- * the iterator is reached.
- */
- def dataDeserialize(
- blockId: String,
- bytes: ByteBuffer,
- serializer: Serializer = defaultSerializer): Iterator[Any] = {
- bytes.rewind()
- val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
- serializer.newInstance().deserializeStream(stream).asIterator
- }
-
- def stop() {
- if (heartBeatTask != null) {
- heartBeatTask.cancel()
- }
- connectionManager.stop()
- actorSystem.stop(slaveActor)
- blockInfo.clear()
- memoryStore.clear()
- diskStore.clear()
- metadataCleaner.cancel()
- logInfo("BlockManager stopped")
- }
-}
-
-
-private[spark] object BlockManager extends Logging {
-
- val ID_GENERATOR = new IdGenerator
-
- def getMaxMemoryFromSystemProperties: Long = {
- val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
- (Runtime.getRuntime.maxMemory * memoryFraction).toLong
- }
-
- def getHeartBeatFrequencyFromSystemProperties: Long =
- System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
-
- def getDisableHeartBeatsForTesting: Boolean =
- System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
-
- /**
- * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
- * might cause errors if one attempts to read from the unmapped buffer, but it's better than
- * waiting for the GC to find it because that could lead to huge numbers of open files. There's
- * unfortunately no standard API to do this.
- */
- def dispose(buffer: ByteBuffer) {
- if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
- logTrace("Unmapping " + buffer)
- if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
- buffer.asInstanceOf[DirectBuffer].cleaner().clean()
- }
- }
- }
-
- def blockIdsToBlockManagers(
- blockIds: Array[String],
- env: SparkEnv,
- blockManagerMaster: BlockManagerMaster = null)
- : Map[String, Seq[BlockManagerId]] =
- {
- // env == null and blockManagerMaster != null is used in tests
- assert (env != null || blockManagerMaster != null)
- val blockLocations: Seq[Seq[BlockManagerId]] = if (env != null) {
- env.blockManager.getLocationBlockIds(blockIds)
- } else {
- blockManagerMaster.getLocations(blockIds)
- }
-
- val blockManagers = new HashMap[String, Seq[BlockManagerId]]
- for (i <- 0 until blockIds.length) {
- blockManagers(blockIds(i)) = blockLocations(i)
- }
- blockManagers.toMap
- }
-
- def blockIdsToExecutorIds(
- blockIds: Array[String],
- env: SparkEnv,
- blockManagerMaster: BlockManagerMaster = null)
- : Map[String, Seq[String]] =
- {
- blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.executorId))
- }
-
- def blockIdsToHosts(
- blockIds: Array[String],
- env: SparkEnv,
- blockManagerMaster: BlockManagerMaster = null)
- : Map[String, Seq[String]] =
- {
- blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host))
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
deleted file mode 100644
index b36a617..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerId.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
-import java.util.concurrent.ConcurrentHashMap
-import spark.Utils
-
-/**
- * This class represent an unique identifier for a BlockManager.
- * The first 2 constructors of this class is made private to ensure that
- * BlockManagerId objects can be created only using the apply method in
- * the companion object. This allows de-duplication of ID objects.
- * Also, constructor parameters are private to ensure that parameters cannot
- * be modified from outside this class.
- */
-private[spark] class BlockManagerId private (
- private var executorId_ : String,
- private var host_ : String,
- private var port_ : Int,
- private var nettyPort_ : Int
- ) extends Externalizable {
-
- private def this() = this(null, null, 0, 0) // For deserialization only
-
- def executorId: String = executorId_
-
- if (null != host_){
- Utils.checkHost(host_, "Expected hostname")
- assert (port_ > 0)
- }
-
- def hostPort: String = {
- // DEBUG code
- Utils.checkHost(host)
- assert (port > 0)
-
- host + ":" + port
- }
-
- def host: String = host_
-
- def port: Int = port_
-
- def nettyPort: Int = nettyPort_
-
- override def writeExternal(out: ObjectOutput) {
- out.writeUTF(executorId_)
- out.writeUTF(host_)
- out.writeInt(port_)
- out.writeInt(nettyPort_)
- }
-
- override def readExternal(in: ObjectInput) {
- executorId_ = in.readUTF()
- host_ = in.readUTF()
- port_ = in.readInt()
- nettyPort_ = in.readInt()
- }
-
- @throws(classOf[IOException])
- private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
-
- override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port, nettyPort)
-
- override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort
-
- override def equals(that: Any) = that match {
- case id: BlockManagerId =>
- executorId == id.executorId && port == id.port && host == id.host && nettyPort == id.nettyPort
- case _ =>
- false
- }
-}
-
-
-private[spark] object BlockManagerId {
-
- /**
- * Returns a [[spark.storage.BlockManagerId]] for the given configuraiton.
- *
- * @param execId ID of the executor.
- * @param host Host name of the block manager.
- * @param port Port of the block manager.
- * @param nettyPort Optional port for the Netty-based shuffle sender.
- * @return A new [[spark.storage.BlockManagerId]].
- */
- def apply(execId: String, host: String, port: Int, nettyPort: Int) =
- getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
-
- def apply(in: ObjectInput) = {
- val obj = new BlockManagerId()
- obj.readExternal(in)
- getCachedBlockManagerId(obj)
- }
-
- val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
-
- def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
- blockManagerIdCache.putIfAbsent(id, id)
- blockManagerIdCache.get(id)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
deleted file mode 100644
index 76128e8..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import akka.actor.ActorRef
-import akka.dispatch.{Await, Future}
-import akka.pattern.ask
-import akka.util.Duration
-
-import spark.{Logging, SparkException}
-import spark.storage.BlockManagerMessages._
-
-
-private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
-
- val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
- val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
-
- val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
-
- val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
- /** Remove a dead executor from the driver actor. This is only called on the driver side. */
- def removeExecutor(execId: String) {
- tell(RemoveExecutor(execId))
- logInfo("Removed " + execId + " successfully in removeExecutor")
- }
-
- /**
- * Send the driver actor a heart beat from the slave. Returns true if everything works out,
- * false if the driver does not know about the given block manager, which means the block
- * manager should re-register.
- */
- def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
- askDriverWithReply[Boolean](HeartBeat(blockManagerId))
- }
-
- /** Register the BlockManager's id with the driver. */
- def registerBlockManager(
- blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
- logInfo("Trying to register BlockManager")
- tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
- logInfo("Registered BlockManager")
- }
-
- def updateBlockInfo(
- blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long): Boolean = {
- val res = askDriverWithReply[Boolean](
- UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
- logInfo("Updated info of block " + blockId)
- res
- }
-
- /** Get locations of the blockId from the driver */
- def getLocations(blockId: String): Seq[BlockManagerId] = {
- askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId))
- }
-
- /** Get locations of multiple blockIds from the driver */
- def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
- askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
- }
-
- /** Get ids of other nodes in the cluster from the driver */
- def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
- val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
- if (result.length != numPeers) {
- throw new SparkException(
- "Error getting peers, only got " + result.size + " instead of " + numPeers)
- }
- result
- }
-
- /**
- * Remove a block from the slaves that have it. This can only be used to remove
- * blocks that the driver knows about.
- */
- def removeBlock(blockId: String) {
- askDriverWithReply(RemoveBlock(blockId))
- }
-
- /**
- * Remove all blocks belonging to the given RDD.
- */
- def removeRdd(rddId: Int, blocking: Boolean) {
- val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
- future onFailure {
- case e: Throwable => logError("Failed to remove RDD " + rddId, e)
- }
- if (blocking) {
- Await.result(future, timeout)
- }
- }
-
- /**
- * Return the memory status for each block manager, in the form of a map from
- * the block manager's id to two long values. The first value is the maximum
- * amount of memory allocated for the block manager, while the second is the
- * amount of remaining memory.
- */
- def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
- askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
- }
-
- def getStorageStatus: Array[StorageStatus] = {
- askDriverWithReply[Array[StorageStatus]](GetStorageStatus)
- }
-
- /** Stop the driver actor, called only on the Spark driver node */
- def stop() {
- if (driverActor != null) {
- tell(StopBlockManagerMaster)
- driverActor = null
- logInfo("BlockManagerMaster stopped")
- }
- }
-
- /** Send a one-way message to the master actor, to which we expect it to reply with true. */
- private def tell(message: Any) {
- if (!askDriverWithReply[Boolean](message)) {
- throw new SparkException("BlockManagerMasterActor returned false, expected true.")
- }
- }
-
- /**
- * Send a message to the driver actor and get its result within a default timeout, or
- * throw a SparkException if this fails.
- */
- private def askDriverWithReply[T](message: Any): T = {
- // TODO: Consider removing multiple attempts
- if (driverActor == null) {
- throw new SparkException("Error sending message to BlockManager as driverActor is null " +
- "[message = " + message + "]")
- }
- var attempts = 0
- var lastException: Exception = null
- while (attempts < AKKA_RETRY_ATTEMPTS) {
- attempts += 1
- try {
- val future = driverActor.ask(message)(timeout)
- val result = Await.result(future, timeout)
- if (result == null) {
- throw new SparkException("BlockManagerMaster returned null")
- }
- return result.asInstanceOf[T]
- } catch {
- case ie: InterruptedException => throw ie
- case e: Exception =>
- lastException = e
- logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
- }
- Thread.sleep(AKKA_RETRY_INTERVAL_MS)
- }
-
- throw new SparkException(
- "Error sending message to BlockManagerMaster [message = " + message + "]", lastException)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
deleted file mode 100644
index b7a981d..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.mutable
-import scala.collection.JavaConversions._
-
-import akka.actor.{Actor, ActorRef, Cancellable}
-import akka.dispatch.Future
-import akka.pattern.ask
-import akka.util.Duration
-import akka.util.duration._
-
-import spark.{Logging, Utils, SparkException}
-import spark.storage.BlockManagerMessages._
-
-
-/**
- * BlockManagerMasterActor is an actor on the master node to track statuses of
- * all slaves' block managers.
- */
-private[spark]
-class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
-
- // Mapping from block manager id to the block manager's information.
- private val blockManagerInfo =
- new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
-
- // Mapping from executor ID to block manager ID.
- private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
-
- // Mapping from block id to the set of block managers that have the block.
- private val blockLocations = new JHashMap[String, mutable.HashSet[BlockManagerId]]
-
- val akkaTimeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
- initLogging()
-
- val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
- "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
-
- val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
- "60000").toLong
-
- var timeoutCheckingTask: Cancellable = null
-
- override def preStart() {
- if (!BlockManager.getDisableHeartBeatsForTesting) {
- timeoutCheckingTask = context.system.scheduler.schedule(
- 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
- }
- super.preStart()
- }
-
- def receive = {
- case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
- register(blockManagerId, maxMemSize, slaveActor)
- sender ! true
-
- case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- // TODO: Ideally we want to handle all the message replies in receive instead of in the
- // individual private methods.
- updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
-
- case GetLocations(blockId) =>
- sender ! getLocations(blockId)
-
- case GetLocationsMultipleBlockIds(blockIds) =>
- sender ! getLocationsMultipleBlockIds(blockIds)
-
- case GetPeers(blockManagerId, size) =>
- sender ! getPeers(blockManagerId, size)
-
- case GetMemoryStatus =>
- sender ! memoryStatus
-
- case GetStorageStatus =>
- sender ! storageStatus
-
- case RemoveRdd(rddId) =>
- sender ! removeRdd(rddId)
-
- case RemoveBlock(blockId) =>
- removeBlockFromWorkers(blockId)
- sender ! true
-
- case RemoveExecutor(execId) =>
- removeExecutor(execId)
- sender ! true
-
- case StopBlockManagerMaster =>
- logInfo("Stopping BlockManagerMaster")
- sender ! true
- if (timeoutCheckingTask != null) {
- timeoutCheckingTask.cancel()
- }
- context.stop(self)
-
- case ExpireDeadHosts =>
- expireDeadHosts()
-
- case HeartBeat(blockManagerId) =>
- sender ! heartBeat(blockManagerId)
-
- case other =>
- logWarning("Got unknown message: " + other)
- }
-
- private def removeRdd(rddId: Int): Future[Seq[Int]] = {
- // First remove the metadata for the given RDD, and then asynchronously remove the blocks
- // from the slaves.
-
- val prefix = "rdd_" + rddId + "_"
- // Find all blocks for the given RDD, remove the block from both blockLocations and
- // the blockManagerInfo that is tracking the blocks.
- val blocks = blockLocations.keySet().filter(_.startsWith(prefix))
- blocks.foreach { blockId =>
- val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
- bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
- blockLocations.remove(blockId)
- }
-
- // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
- // The dispatcher is used as an implicit argument into the Future sequence construction.
- import context.dispatcher
- val removeMsg = RemoveRdd(rddId)
- Future.sequence(blockManagerInfo.values.map { bm =>
- bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
- }.toSeq)
- }
-
- private def removeBlockManager(blockManagerId: BlockManagerId) {
- val info = blockManagerInfo(blockManagerId)
-
- // Remove the block manager from blockManagerIdByExecutor.
- blockManagerIdByExecutor -= blockManagerId.executorId
-
- // Remove it from blockManagerInfo and remove all the blocks.
- blockManagerInfo.remove(blockManagerId)
- val iterator = info.blocks.keySet.iterator
- while (iterator.hasNext) {
- val blockId = iterator.next
- val locations = blockLocations.get(blockId)
- locations -= blockManagerId
- if (locations.size == 0) {
- blockLocations.remove(locations)
- }
- }
- }
-
- private def expireDeadHosts() {
- logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
- val now = System.currentTimeMillis()
- val minSeenTime = now - slaveTimeout
- val toRemove = new mutable.HashSet[BlockManagerId]
- for (info <- blockManagerInfo.values) {
- if (info.lastSeenMs < minSeenTime) {
- logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
- (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
- toRemove += info.blockManagerId
- }
- }
- toRemove.foreach(removeBlockManager)
- }
-
- private def removeExecutor(execId: String) {
- logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
- blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
- }
-
- private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
- if (!blockManagerInfo.contains(blockManagerId)) {
- blockManagerId.executorId == "<driver>" && !isLocal
- } else {
- blockManagerInfo(blockManagerId).updateLastSeenMs()
- true
- }
- }
-
- // Remove a block from the slaves that have it. This can only be used to remove
- // blocks that the master knows about.
- private def removeBlockFromWorkers(blockId: String) {
- val locations = blockLocations.get(blockId)
- if (locations != null) {
- locations.foreach { blockManagerId: BlockManagerId =>
- val blockManager = blockManagerInfo.get(blockManagerId)
- if (blockManager.isDefined) {
- // Remove the block from the slave's BlockManager.
- // Doesn't actually wait for a confirmation and the message might get lost.
- // If message loss becomes frequent, we should add retry logic here.
- blockManager.get.slaveActor ! RemoveBlock(blockId)
- }
- }
- }
- }
-
- // Return a map from the block manager id to max memory and remaining memory.
- private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
- blockManagerInfo.map { case(blockManagerId, info) =>
- (blockManagerId, (info.maxMem, info.remainingMem))
- }.toMap
- }
-
- private def storageStatus: Array[StorageStatus] = {
- blockManagerInfo.map { case(blockManagerId, info) =>
- import collection.JavaConverters._
- StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap)
- }.toArray
- }
-
- private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
- if (id.executorId == "<driver>" && !isLocal) {
- // Got a register message from the master node; don't register it
- } else if (!blockManagerInfo.contains(id)) {
- blockManagerIdByExecutor.get(id.executorId) match {
- case Some(manager) =>
- // A block manager of the same executor already exists.
- // This should never happen. Let's just quit.
- logError("Got two different block manager registrations on " + id.executorId)
- System.exit(1)
- case None =>
- blockManagerIdByExecutor(id.executorId) = id
- }
- blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
- id, System.currentTimeMillis(), maxMemSize, slaveActor)
- }
- }
-
- private def updateBlockInfo(
- blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long) {
-
- if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.executorId == "<driver>" && !isLocal) {
- // We intentionally do not register the master (except in local mode),
- // so we should not indicate failure.
- sender ! true
- } else {
- sender ! false
- }
- return
- }
-
- if (blockId == null) {
- blockManagerInfo(blockManagerId).updateLastSeenMs()
- sender ! true
- return
- }
-
- blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
-
- var locations: mutable.HashSet[BlockManagerId] = null
- if (blockLocations.containsKey(blockId)) {
- locations = blockLocations.get(blockId)
- } else {
- locations = new mutable.HashSet[BlockManagerId]
- blockLocations.put(blockId, locations)
- }
-
- if (storageLevel.isValid) {
- locations.add(blockManagerId)
- } else {
- locations.remove(blockManagerId)
- }
-
- // Remove the block from master tracking if it has been removed on all slaves.
- if (locations.size == 0) {
- blockLocations.remove(blockId)
- }
- sender ! true
- }
-
- private def getLocations(blockId: String): Seq[BlockManagerId] = {
- if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
- }
-
- private def getLocationsMultipleBlockIds(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
- blockIds.map(blockId => getLocations(blockId))
- }
-
- private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
- val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
- val selfIndex = peers.indexOf(blockManagerId)
- if (selfIndex == -1) {
- throw new SparkException("Self index for " + blockManagerId + " not found")
- }
-
- // Note that this logic will select the same node multiple times if there aren't enough peers
- Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
- }
-}
-
-
-private[spark]
-object BlockManagerMasterActor {
-
- case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
-
- class BlockManagerInfo(
- val blockManagerId: BlockManagerId,
- timeMs: Long,
- val maxMem: Long,
- val slaveActor: ActorRef)
- extends Logging {
-
- private var _lastSeenMs: Long = timeMs
- private var _remainingMem: Long = maxMem
-
- // Mapping from block id to its status.
- private val _blocks = new JHashMap[String, BlockStatus]
-
- logInfo("Registering block manager %s with %s RAM".format(
- blockManagerId.hostPort, Utils.bytesToString(maxMem)))
-
- def updateLastSeenMs() {
- _lastSeenMs = System.currentTimeMillis()
- }
-
- def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long,
- diskSize: Long) {
-
- updateLastSeenMs()
-
- if (_blocks.containsKey(blockId)) {
- // The block exists on the slave already.
- val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
-
- if (originalLevel.useMemory) {
- _remainingMem += memSize
- }
- }
-
- if (storageLevel.isValid) {
- // isValid means it is either stored in-memory or on-disk.
- _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
- if (storageLevel.useMemory) {
- _remainingMem -= memSize
- logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
- Utils.bytesToString(_remainingMem)))
- }
- if (storageLevel.useDisk) {
- logInfo("Added %s on disk on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
- }
- } else if (_blocks.containsKey(blockId)) {
- // If isValid is not true, drop the block.
- val blockStatus: BlockStatus = _blocks.get(blockId)
- _blocks.remove(blockId)
- if (blockStatus.storageLevel.useMemory) {
- _remainingMem += blockStatus.memSize
- logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
- Utils.bytesToString(_remainingMem)))
- }
- if (blockStatus.storageLevel.useDisk) {
- logInfo("Removed %s on %s on disk (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
- }
- }
- }
-
- def removeBlock(blockId: String) {
- if (_blocks.containsKey(blockId)) {
- _remainingMem += _blocks.get(blockId).memSize
- _blocks.remove(blockId)
- }
- }
-
- def remainingMem: Long = _remainingMem
-
- def lastSeenMs: Long = _lastSeenMs
-
- def blocks: JHashMap[String, BlockStatus] = _blocks
-
- override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
-
- def clear() {
- _blocks.clear()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
deleted file mode 100644
index 9375a9c..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-
-import akka.actor.ActorRef
-
-
-private[storage] object BlockManagerMessages {
- //////////////////////////////////////////////////////////////////////////////////
- // Messages from the master to slaves.
- //////////////////////////////////////////////////////////////////////////////////
- sealed trait ToBlockManagerSlave
-
- // Remove a block from the slaves that have it. This can only be used to remove
- // blocks that the master knows about.
- case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
-
- // Remove all blocks belonging to a specific RDD.
- case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
-
-
- //////////////////////////////////////////////////////////////////////////////////
- // Messages from slaves to the master.
- //////////////////////////////////////////////////////////////////////////////////
- sealed trait ToBlockManagerMaster
-
- case class RegisterBlockManager(
- blockManagerId: BlockManagerId,
- maxMemSize: Long,
- sender: ActorRef)
- extends ToBlockManagerMaster
-
- case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
- class UpdateBlockInfo(
- var blockManagerId: BlockManagerId,
- var blockId: String,
- var storageLevel: StorageLevel,
- var memSize: Long,
- var diskSize: Long)
- extends ToBlockManagerMaster
- with Externalizable {
-
- def this() = this(null, null, null, 0, 0) // For deserialization only
-
- override def writeExternal(out: ObjectOutput) {
- blockManagerId.writeExternal(out)
- out.writeUTF(blockId)
- storageLevel.writeExternal(out)
- out.writeLong(memSize)
- out.writeLong(diskSize)
- }
-
- override def readExternal(in: ObjectInput) {
- blockManagerId = BlockManagerId(in)
- blockId = in.readUTF()
- storageLevel = StorageLevel(in)
- memSize = in.readLong()
- diskSize = in.readLong()
- }
- }
-
- object UpdateBlockInfo {
- def apply(blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long): UpdateBlockInfo = {
- new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
- }
-
- // For pattern-matching
- def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
- Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
- }
- }
-
- case class GetLocations(blockId: String) extends ToBlockManagerMaster
-
- case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-
- case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-
- case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
-
- case object StopBlockManagerMaster extends ToBlockManagerMaster
-
- case object GetMemoryStatus extends ToBlockManagerMaster
-
- case object ExpireDeadHosts extends ToBlockManagerMaster
-
- case object GetStorageStatus extends ToBlockManagerMaster
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
deleted file mode 100644
index 6e5fb43..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import akka.actor.Actor
-
-import spark.storage.BlockManagerMessages._
-
-
-/**
- * An actor to take commands from the master to execute options. For example,
- * this is used to remove blocks from the slave's BlockManager.
- */
-class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
- override def receive = {
-
- case RemoveBlock(blockId) =>
- blockManager.removeBlock(blockId)
-
- case RemoveRdd(rddId) =>
- val numBlocksRemoved = blockManager.removeRdd(rddId)
- sender ! numBlocksRemoved
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala
deleted file mode 100644
index 2aecd1e..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerSource.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-package spark.storage
-
-import com.codahale.metrics.{Gauge,MetricRegistry}
-
-import spark.metrics.source.Source
-
-
-private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
- val metricRegistry = new MetricRegistry()
- val sourceName = "BlockManager"
-
- metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
- maxMem / 1024 / 1024
- }
- })
-
- metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
- remainingMem / 1024 / 1024
- }
- })
-
- metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
- (maxMem - remainingMem) / 1024 / 1024
- }
- })
-
- metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val diskSpaceUsed = storageStatusList
- .flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_ + _)
- .getOrElse(0L)
-
- diskSpaceUsed / 1024 / 1024
- }
- })
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerWorker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
deleted file mode 100644
index 39064bc..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.nio.ByteBuffer
-
-import spark.{Logging, Utils}
-import spark.network._
-
-/**
- * A network interface for BlockManager. Each slave should have one
- * BlockManagerWorker.
- *
- * TODO: Use event model.
- */
-private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
- initLogging()
-
- blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
-
- def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = {
- logDebug("Handling message " + msg)
- msg match {
- case bufferMessage: BufferMessage => {
- try {
- logDebug("Handling as a buffer message " + bufferMessage)
- val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
- logDebug("Parsed as a block message array")
- val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
- return Some(new BlockMessageArray(responseMessages).toBufferMessage)
- } catch {
- case e: Exception => logError("Exception handling buffer message", e)
- return None
- }
- }
- case otherMessage: Any => {
- logError("Unknown type message received: " + otherMessage)
- return None
- }
- }
- }
-
- def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
- blockMessage.getType match {
- case BlockMessage.TYPE_PUT_BLOCK => {
- val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
- logDebug("Received [" + pB + "]")
- putBlock(pB.id, pB.data, pB.level)
- return None
- }
- case BlockMessage.TYPE_GET_BLOCK => {
- val gB = new GetBlock(blockMessage.getId)
- logDebug("Received [" + gB + "]")
- val buffer = getBlock(gB.id)
- if (buffer == null) {
- return None
- }
- return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
- }
- case _ => return None
- }
- }
-
- private def putBlock(id: String, bytes: ByteBuffer, level: StorageLevel) {
- val startTimeMs = System.currentTimeMillis()
- logDebug("PutBlock " + id + " started from " + startTimeMs + " with data: " + bytes)
- blockManager.putBytes(id, bytes, level)
- logDebug("PutBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
- + " with data size: " + bytes.limit)
- }
-
- private def getBlock(id: String): ByteBuffer = {
- val startTimeMs = System.currentTimeMillis()
- logDebug("GetBlock " + id + " started from " + startTimeMs)
- val buffer = blockManager.getLocalBytes(id) match {
- case Some(bytes) => bytes
- case None => null
- }
- logDebug("GetBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
- + " and got buffer " + buffer)
- return buffer
- }
-}
-
-private[spark] object BlockManagerWorker extends Logging {
- private var blockManagerWorker: BlockManagerWorker = null
-
- initLogging()
-
- def startBlockManagerWorker(manager: BlockManager) {
- blockManagerWorker = new BlockManagerWorker(manager)
- }
-
- def syncPutBlock(msg: PutBlock, toConnManagerId: ConnectionManagerId): Boolean = {
- val blockManager = blockManagerWorker.blockManager
- val connectionManager = blockManager.connectionManager
- val blockMessage = BlockMessage.fromPutBlock(msg)
- val blockMessageArray = new BlockMessageArray(blockMessage)
- val resultMessage = connectionManager.sendMessageReliablySync(
- toConnManagerId, blockMessageArray.toBufferMessage)
- return (resultMessage != None)
- }
-
- def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
- val blockManager = blockManagerWorker.blockManager
- val connectionManager = blockManager.connectionManager
- val blockMessage = BlockMessage.fromGetBlock(msg)
- val blockMessageArray = new BlockMessageArray(blockMessage)
- val responseMessage = connectionManager.sendMessageReliablySync(
- toConnManagerId, blockMessageArray.toBufferMessage)
- responseMessage match {
- case Some(message) => {
- val bufferMessage = message.asInstanceOf[BufferMessage]
- logDebug("Response message received " + bufferMessage)
- BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
- logDebug("Found " + blockMessage)
- return blockMessage.getData
- })
- }
- case None => logDebug("No response message received"); return null
- }
- return null
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
deleted file mode 100644
index bcce26b..0000000
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.StringBuilder
-import scala.collection.mutable.ArrayBuffer
-
-import spark.network._
-
-private[spark] case class GetBlock(id: String)
-private[spark] case class GotBlock(id: String, data: ByteBuffer)
-private[spark] case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel)
-
-private[spark] class BlockMessage() {
- // Un-initialized: typ = 0
- // GetBlock: typ = 1
- // GotBlock: typ = 2
- // PutBlock: typ = 3
- private var typ: Int = BlockMessage.TYPE_NON_INITIALIZED
- private var id: String = null
- private var data: ByteBuffer = null
- private var level: StorageLevel = null
-
- def set(getBlock: GetBlock) {
- typ = BlockMessage.TYPE_GET_BLOCK
- id = getBlock.id
- }
-
- def set(gotBlock: GotBlock) {
- typ = BlockMessage.TYPE_GOT_BLOCK
- id = gotBlock.id
- data = gotBlock.data
- }
-
- def set(putBlock: PutBlock) {
- typ = BlockMessage.TYPE_PUT_BLOCK
- id = putBlock.id
- data = putBlock.data
- level = putBlock.level
- }
-
- def set(buffer: ByteBuffer) {
- val startTime = System.currentTimeMillis
- /*
- println()
- println("BlockMessage: ")
- while(buffer.remaining > 0) {
- print(buffer.get())
- }
- buffer.rewind()
- println()
- println()
- */
- typ = buffer.getInt()
- val idLength = buffer.getInt()
- val idBuilder = new StringBuilder(idLength)
- for (i <- 1 to idLength) {
- idBuilder += buffer.getChar()
- }
- id = idBuilder.toString()
-
- if (typ == BlockMessage.TYPE_PUT_BLOCK) {
-
- val booleanInt = buffer.getInt()
- val replication = buffer.getInt()
- level = StorageLevel(booleanInt, replication)
-
- val dataLength = buffer.getInt()
- data = ByteBuffer.allocate(dataLength)
- if (dataLength != buffer.remaining) {
- throw new Exception("Error parsing buffer")
- }
- data.put(buffer)
- data.flip()
- } else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
-
- val dataLength = buffer.getInt()
- data = ByteBuffer.allocate(dataLength)
- if (dataLength != buffer.remaining) {
- throw new Exception("Error parsing buffer")
- }
- data.put(buffer)
- data.flip()
- }
-
- val finishTime = System.currentTimeMillis
- }
-
- def set(bufferMsg: BufferMessage) {
- val buffer = bufferMsg.buffers.apply(0)
- buffer.clear()
- set(buffer)
- }
-
- def getType: Int = {
- return typ
- }
-
- def getId: String = {
- return id
- }
-
- def getData: ByteBuffer = {
- return data
- }
-
- def getLevel: StorageLevel = {
- return level
- }
-
- def toBufferMessage: BufferMessage = {
- val startTime = System.currentTimeMillis
- val buffers = new ArrayBuffer[ByteBuffer]()
- var buffer = ByteBuffer.allocate(4 + 4 + id.length() * 2)
- buffer.putInt(typ).putInt(id.length())
- id.foreach((x: Char) => buffer.putChar(x))
- buffer.flip()
- buffers += buffer
-
- if (typ == BlockMessage.TYPE_PUT_BLOCK) {
- buffer = ByteBuffer.allocate(8).putInt(level.toInt).putInt(level.replication)
- buffer.flip()
- buffers += buffer
-
- buffer = ByteBuffer.allocate(4).putInt(data.remaining)
- buffer.flip()
- buffers += buffer
-
- buffers += data
- } else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
- buffer = ByteBuffer.allocate(4).putInt(data.remaining)
- buffer.flip()
- buffers += buffer
-
- buffers += data
- }
-
- /*
- println()
- println("BlockMessage: ")
- buffers.foreach(b => {
- while(b.remaining > 0) {
- print(b.get())
- }
- b.rewind()
- })
- println()
- println()
- */
- val finishTime = System.currentTimeMillis
- return Message.createBufferMessage(buffers)
- }
-
- override def toString: String = {
- "BlockMessage [type = " + typ + ", id = " + id + ", level = " + level +
- ", data = " + (if (data != null) data.remaining.toString else "null") + "]"
- }
-}
-
-private[spark] object BlockMessage {
- val TYPE_NON_INITIALIZED: Int = 0
- val TYPE_GET_BLOCK: Int = 1
- val TYPE_GOT_BLOCK: Int = 2
- val TYPE_PUT_BLOCK: Int = 3
-
- def fromBufferMessage(bufferMessage: BufferMessage): BlockMessage = {
- val newBlockMessage = new BlockMessage()
- newBlockMessage.set(bufferMessage)
- newBlockMessage
- }
-
- def fromByteBuffer(buffer: ByteBuffer): BlockMessage = {
- val newBlockMessage = new BlockMessage()
- newBlockMessage.set(buffer)
- newBlockMessage
- }
-
- def fromGetBlock(getBlock: GetBlock): BlockMessage = {
- val newBlockMessage = new BlockMessage()
- newBlockMessage.set(getBlock)
- newBlockMessage
- }
-
- def fromGotBlock(gotBlock: GotBlock): BlockMessage = {
- val newBlockMessage = new BlockMessage()
- newBlockMessage.set(gotBlock)
- newBlockMessage
- }
-
- def fromPutBlock(putBlock: PutBlock): BlockMessage = {
- val newBlockMessage = new BlockMessage()
- newBlockMessage.set(putBlock)
- newBlockMessage
- }
-
- def main(args: Array[String]) {
- val B = new BlockMessage()
- B.set(new PutBlock("ABC", ByteBuffer.allocate(10), StorageLevel.MEMORY_AND_DISK_SER_2))
- val bMsg = B.toBufferMessage
- val C = new BlockMessage()
- C.set(bMsg)
-
- println(B.getId + " " + B.getLevel)
- println(C.getId + " " + C.getLevel)
- }
-}