You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/05/01 07:22:55 UTC

spark git commit: [SPARK-6479] [BLOCK MANAGER] Create off-heap block storage API

Repository: spark
Updated Branches:
  refs/heads/master b5347a466 -> 36a7a6807


[SPARK-6479] [BLOCK MANAGER] Create off-heap block storage API

This is the classes for creating off-heap block storage API. It also includes the migration for Tachyon. The diff seems to be big, but it mainly just rename tachyon to offheap. New implementation for hdfs will be submit for review in spark-6112.

Author: Zhan Zhang <zh...@gmail.com>

Closes #5430 from zhzhan/SPARK-6479 and squashes the following commits:

60acd84 [Zhan Zhang] minor change to kickoff the test
12f54c9 [Zhan Zhang] solve merge conflicts
a54132c [Zhan Zhang] solve review comments
ffb8e00 [Zhan Zhang] rebase to sparkcontext change
6e121e0 [Zhan Zhang] resolve review comments and restructure blockmanasger code
a7aed6c [Zhan Zhang] add Tachyon migration code
186de31 [Zhan Zhang] initial commit for off-heap block storage api


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36a7a680
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36a7a680
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36a7a680

Branch: refs/heads/master
Commit: 36a7a6807ee58c8ba309c0e09e547919246b7f82
Parents: b5347a4
Author: Zhan Zhang <zh...@gmail.com>
Authored: Thu Apr 30 22:24:31 2015 -0700
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Thu Apr 30 22:24:31 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   8 +-
 .../spark/executor/ExecutorExitCode.scala       |  16 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   4 +-
 .../org/apache/spark/storage/BlockManager.scala |  63 +++----
 .../spark/storage/BlockManagerMaster.scala      |   5 +-
 .../storage/BlockManagerMasterEndpoint.scala    |  29 +--
 .../spark/storage/BlockManagerMessages.scala    |   6 +-
 .../spark/storage/ExternalBlockManager.scala    | 102 +++++++++++
 .../spark/storage/ExternalBlockStore.scala      | 181 +++++++++++++++++++
 .../org/apache/spark/storage/RDDInfo.scala      |   9 +-
 .../org/apache/spark/storage/StorageLevel.scala |   8 +-
 .../org/apache/spark/storage/StorageUtils.scala |  19 +-
 .../spark/storage/TachyonBlockManager.scala     | 138 ++++++++++----
 .../spark/storage/TachyonFileSegment.scala      |  30 ---
 .../org/apache/spark/storage/TachyonStore.scala | 128 -------------
 .../apache/spark/ui/storage/StoragePage.scala   |   4 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  24 ++-
 .../spark/storage/BlockManagerSuite.scala       |   1 +
 .../org/apache/spark/storage/StorageSuite.scala |   8 +-
 .../spark/ui/storage/StorageTabSuite.scala      |   6 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |  56 +++---
 docs/configuration.md                           |  22 ++-
 22 files changed, 536 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bae951f..fe24260 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -247,9 +247,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private[spark] def eventLogDir: Option[URI] = _eventLogDir
   private[spark] def eventLogCodec: Option[String] = _eventLogCodec
 
-  // Generate the random name for a temp folder in Tachyon
+  // Generate the random name for a temp folder in external block store.
   // Add a timestamp as the suffix here to make it more safe
-  val tachyonFolderName = "spark-" + randomUUID.toString()
+  val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
+  @deprecated("Use externalBlockStoreFolderName instead.", "1.4.0")
+  val tachyonFolderName = externalBlockStoreFolderName
 
   def isLocal: Boolean = (master == "local" || master.startsWith("local["))
 
@@ -386,7 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       }
     }
 
-    _conf.set("spark.tachyonStore.folderName", tachyonFolderName)
+    _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
 
     if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index 52862ae..ea36fb6 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -33,11 +33,11 @@ object ExecutorExitCode {
   /** DiskStore failed to create a local temporary directory after many attempts. */
   val DISK_STORE_FAILED_TO_CREATE_DIR = 53
 
-  /** TachyonStore failed to initialize after many attempts. */
-  val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
+  /** ExternalBlockStore failed to initialize after many attempts. */
+  val EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE = 54
 
-  /** TachyonStore failed to create a local temporary directory after many attempts. */
-  val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55
+  /** ExternalBlockStore failed to create a local temporary directory after many attempts. */
+  val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55
 
   def explainExitCode(exitCode: Int): String = {
     exitCode match {
@@ -46,9 +46,11 @@ object ExecutorExitCode {
       case OOM => "OutOfMemoryError"
       case DISK_STORE_FAILED_TO_CREATE_DIR =>
         "Failed to create local directory (bad spark.local.dir?)"
-      case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
-      case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
-        "TachyonStore failed to create a local temporary directory."
+      // TODO: replace external block store with concrete implementation name
+      case EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE => "ExternalBlockStore failed to initialize."
+      // TODO: replace external block store with concrete implementation name
+      case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
+        "ExternalBlockStore failed to create a local temporary directory."
       case _ =>
         "Unknown executor exit code (" + exitCode + ")" + (
           if (exitCode > 128) {

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 330255f..31c07c7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1475,9 +1475,9 @@ abstract class RDD[T: ClassTag](
 
       val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
       val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
-        "    CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
+        "    CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format(
           info.numCachedPartitions, bytesToString(info.memSize),
-          bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
+          bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize)))
 
       s"$rdd [$persistence]" +: storageInfo
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 55718e5..402ee1c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -78,19 +78,11 @@ private[spark] class BlockManager(
   private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
 
   // Actual storage of where blocks are kept
-  private var tachyonInitialized = false
+  private var externalBlockStoreInitialized = false
   private[spark] val memoryStore = new MemoryStore(this, maxMemory)
   private[spark] val diskStore = new DiskStore(this, diskBlockManager)
-  private[spark] lazy val tachyonStore: TachyonStore = {
-    val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
-    val appFolderName = conf.get("spark.tachyonStore.folderName")
-    val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
-    val tachyonMaster = conf.get("spark.tachyonStore.url",  "tachyon://localhost:19998")
-    val tachyonBlockManager =
-      new TachyonBlockManager(this, tachyonStorePath, tachyonMaster)
-    tachyonInitialized = true
-    new TachyonStore(this, tachyonBlockManager)
-  }
+  private[spark] lazy val externalBlockStore: ExternalBlockStore =
+    new ExternalBlockStore(this, executorId)
 
   private[spark]
   val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -320,13 +312,13 @@ private[spark] class BlockManager(
 
   /**
    * Get the BlockStatus for the block identified by the given ID, if it exists.
-   * NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
+   * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
    */
   def getStatus(blockId: BlockId): Option[BlockStatus] = {
     blockInfo.get(blockId).map { info =>
       val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
       val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
-      // Assume that block is not in Tachyon
+      // Assume that block is not in external block store
       BlockStatus(info.level, memSize, diskSize, 0L)
     }
   }
@@ -376,10 +368,10 @@ private[spark] class BlockManager(
     if (info.tellMaster) {
       val storageLevel = status.storageLevel
       val inMemSize = Math.max(status.memSize, droppedMemorySize)
-      val inTachyonSize = status.tachyonSize
+      val inExternalBlockStoreSize = status.externalBlockStoreSize
       val onDiskSize = status.diskSize
       master.updateBlockInfo(
-        blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
+        blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
     } else {
       true
     }
@@ -397,15 +389,17 @@ private[spark] class BlockManager(
           BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
         case level =>
           val inMem = level.useMemory && memoryStore.contains(blockId)
-          val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
+          val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
           val onDisk = level.useDisk && diskStore.contains(blockId)
           val deserialized = if (inMem) level.deserialized else false
-          val replication = if (inMem || inTachyon || onDisk) level.replication else 1
-          val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
+          val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
+          val storageLevel =
+            StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
           val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
-          val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
+          val externalBlockStoreSize =
+            if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
           val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
-          BlockStatus(storageLevel, memSize, diskSize, tachyonSize)
+          BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
       }
     }
   }
@@ -485,11 +479,11 @@ private[spark] class BlockManager(
           }
         }
 
-        // Look for the block in Tachyon
+        // Look for the block in external block store
         if (level.useOffHeap) {
-          logDebug(s"Getting block $blockId from tachyon")
-          if (tachyonStore.contains(blockId)) {
-            tachyonStore.getBytes(blockId) match {
+          logDebug(s"Getting block $blockId from ExternalBlockStore")
+          if (externalBlockStore.contains(blockId)) {
+            externalBlockStore.getBytes(blockId) match {
               case Some(bytes) =>
                 if (!asBlockResult) {
                   return Some(bytes)
@@ -498,7 +492,7 @@ private[spark] class BlockManager(
                     dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
                 }
               case None =>
-                logDebug(s"Block $blockId not found in tachyon")
+                logDebug(s"Block $blockId not found in externalBlockStore")
             }
           }
         }
@@ -766,8 +760,8 @@ private[spark] class BlockManager(
             // We will drop it to disk later if the memory store can't hold it.
             (true, memoryStore)
           } else if (putLevel.useOffHeap) {
-            // Use tachyon for off-heap storage
-            (false, tachyonStore)
+            // Use external block store
+            (false, externalBlockStore)
           } else if (putLevel.useDisk) {
             // Don't get back the bytes from put unless we replicate them
             (putLevel.replication > 1, diskStore)
@@ -802,7 +796,7 @@ private[spark] class BlockManager(
 
         val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
         if (putBlockStatus.storageLevel != StorageLevel.NONE) {
-          // Now that the block is in either the memory, tachyon, or disk store,
+          // Now that the block is in either the memory, externalBlockStore, or disk store,
           // let other threads read it, and tell the master about it.
           marked = true
           putBlockInfo.markReady(size)
@@ -1099,10 +1093,11 @@ private[spark] class BlockManager(
         // Removals are idempotent in disk store and memory store. At worst, we get a warning.
         val removedFromMemory = memoryStore.remove(blockId)
         val removedFromDisk = diskStore.remove(blockId)
-        val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
-        if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
+        val removedFromExternalBlockStore =
+          if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
+        if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
           logWarning(s"Block $blockId could not be removed as it was not found in either " +
-            "the disk, memory, or tachyon store")
+            "the disk, memory, or external block store")
         }
         blockInfo.remove(blockId)
         if (tellMaster && info.tellMaster) {
@@ -1136,7 +1131,7 @@ private[spark] class BlockManager(
           val level = info.level
           if (level.useMemory) { memoryStore.remove(id) }
           if (level.useDisk) { diskStore.remove(id) }
-          if (level.useOffHeap) { tachyonStore.remove(id) }
+          if (level.useOffHeap) { externalBlockStore.remove(id) }
           iterator.remove()
           logInfo(s"Dropped block $id")
         }
@@ -1216,8 +1211,8 @@ private[spark] class BlockManager(
     blockInfo.clear()
     memoryStore.clear()
     diskStore.clear()
-    if (tachyonInitialized) {
-      tachyonStore.clear()
+    if (externalBlockStoreInitialized) {
+      externalBlockStore.clear()
     }
     metadataCleaner.cancel()
     broadcastCleaner.cancel()

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 9bfc420..a85e1c7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -54,9 +54,10 @@ class BlockManagerMaster(
       storageLevel: StorageLevel,
       memSize: Long,
       diskSize: Long,
-      tachyonSize: Long): Boolean = {
+      externalBlockStoreSize: Long): Boolean = {
     val res = driverEndpoint.askWithRetry[Boolean](
-      UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
+      UpdateBlockInfo(blockManagerId, blockId, storageLevel,
+        memSize, diskSize, externalBlockStoreSize))
     logDebug(s"Updated info of block $blockId")
     res
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 7212362..3afb4c3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -60,9 +60,9 @@ class BlockManagerMasterEndpoint(
       context.reply(true)
 
     case UpdateBlockInfo(
-      blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
+      blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
       context.reply(updateBlockInfo(
-        blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize))
+        blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
 
     case GetLocations(blockId) =>
       context.reply(getLocations(blockId))
@@ -314,7 +314,7 @@ class BlockManagerMasterEndpoint(
       storageLevel: StorageLevel,
       memSize: Long,
       diskSize: Long,
-      tachyonSize: Long): Boolean = {
+      externalBlockStoreSize: Long): Boolean = {
 
     if (!blockManagerInfo.contains(blockManagerId)) {
       if (blockManagerId.isDriver && !isLocal) {
@@ -332,7 +332,7 @@ class BlockManagerMasterEndpoint(
     }
 
     blockManagerInfo(blockManagerId).updateBlockInfo(
-      blockId, storageLevel, memSize, diskSize, tachyonSize)
+      blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
 
     var locations: mutable.HashSet[BlockManagerId] = null
     if (blockLocations.containsKey(blockId)) {
@@ -396,8 +396,8 @@ case class BlockStatus(
     storageLevel: StorageLevel,
     memSize: Long,
     diskSize: Long,
-    tachyonSize: Long) {
-  def isCached: Boolean = memSize + diskSize + tachyonSize > 0
+    externalBlockStoreSize: Long) {
+  def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
 }
 
 @DeveloperApi
@@ -429,7 +429,7 @@ private[spark] class BlockManagerInfo(
       storageLevel: StorageLevel,
       memSize: Long,
       diskSize: Long,
-      tachyonSize: Long) {
+      externalBlockStoreSize: Long) {
 
     updateLastSeenMs()
 
@@ -445,9 +445,9 @@ private[spark] class BlockManagerInfo(
     }
 
     if (storageLevel.isValid) {
-      /* isValid means it is either stored in-memory, on-disk or on-Tachyon.
+      /* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
        * The memSize here indicates the data size in or dropped from memory,
-       * tachyonSize here indicates the data size in or dropped from Tachyon,
+       * externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
        * and the diskSize here indicates the data size in or dropped to disk.
        * They can be both larger than 0, when a block is dropped from memory to disk.
        * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
@@ -464,9 +464,9 @@ private[spark] class BlockManagerInfo(
           blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
       }
       if (storageLevel.useOffHeap) {
-        _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize))
-        logInfo("Added %s on tachyon on %s (size: %s)".format(
-          blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
+        _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize))
+        logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
+          blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
       }
     } else if (_blocks.containsKey(blockId)) {
       // If isValid is not true, drop the block.
@@ -482,8 +482,9 @@ private[spark] class BlockManagerInfo(
           blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
       }
       if (blockStatus.storageLevel.useOffHeap) {
-        logInfo("Removed %s on %s on tachyon (size: %s)".format(
-          blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
+        logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
+          blockId, blockManagerId.hostPort,
+          Utils.bytesToString(blockStatus.externalBlockStoreSize)))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index f89d8d7..1683576 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -60,7 +60,7 @@ private[spark] object BlockManagerMessages {
       var storageLevel: StorageLevel,
       var memSize: Long,
       var diskSize: Long,
-      var tachyonSize: Long)
+      var externalBlockStoreSize: Long)
     extends ToBlockManagerMaster
     with Externalizable {
 
@@ -72,7 +72,7 @@ private[spark] object BlockManagerMessages {
       storageLevel.writeExternal(out)
       out.writeLong(memSize)
       out.writeLong(diskSize)
-      out.writeLong(tachyonSize)
+      out.writeLong(externalBlockStoreSize)
     }
 
     override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -81,7 +81,7 @@ private[spark] object BlockManagerMessages {
       storageLevel = StorageLevel(in)
       memSize = in.readLong()
       diskSize = in.readLong()
-      tachyonSize = in.readLong()
+      externalBlockStoreSize = in.readLong()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
new file mode 100644
index 0000000..8964762
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+/**
+ * An abstract class that the concrete external block manager has to inherit.
+ * The class has to have a no-argument constructor, and will be initialized by init,
+ * which is invoked by ExternalBlockStore. The main input parameter is blockId for all
+ * the methods, which is the unique identifier for Block in one Spark application.
+ *
+ * The underlying external block manager should avoid any name space conflicts  among multiple
+ * Spark applications. For example, creating different directory for different applications
+ * by randomUUID
+ *
+ */
+private[spark] abstract class ExternalBlockManager {
+
+  override def toString: String = {"External Block Store"}
+
+  /**
+   * Initialize a concrete block manager implementation. Subclass should initialize its internal
+   * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore
+   * right after the class is constructed. The function should throw IOException on failure
+   *
+   * @throws java.io.IOException if there is any file system failure during the initialization.
+   */
+  def init(blockManager: BlockManager, executorId: String): Unit
+
+  /**
+   * Drop the block from underlying external block store, if it exists..
+   * @return true on successfully removing the block
+   *         false if the block could not be removed as it was not found
+   *
+   * @throws java.io.IOException if there is any file system failure in removing the block.
+   */
+  def removeBlock(blockId: BlockId): Boolean
+
+  /**
+   * Used by BlockManager to check the existence of the block in the underlying external
+   * block store.
+   * @return true if the block exists.
+   *         false if the block does not exists.
+   *
+   * @throws java.io.IOException if there is any file system failure in checking
+   *                             the block existence.
+   */
+  def blockExists(blockId: BlockId): Boolean
+
+  /**
+   * Put the given block to the underlying external block store. Note that in normal case,
+   * putting a block should never fail unless something wrong happens to the underlying
+   * external block store, e.g., file system failure, etc. In this case, IOException
+   * should be thrown.
+   *
+   * @throws java.io.IOException if there is any file system failure in putting the block.
+   */
+  def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
+
+  /**
+   * Retrieve the block bytes.
+   * @return Some(ByteBuffer) if the block bytes is successfully retrieved
+   *         None if the block does not exist in the external block store.
+   *
+   * @throws java.io.IOException if there is any file system failure in getting the block.
+   */
+  def getBytes(blockId: BlockId): Option[ByteBuffer]
+
+  /**
+   * Get the size of the block saved in the underlying external block store,
+   * which is saved before by putBytes.
+   * @return size of the block
+   *         0 if the block does not exist
+   *
+   * @throws java.io.IOException if there is any file system failure in getting the block size.
+   */
+  def getSize(blockId: BlockId): Long
+
+  /**
+   * Clean up any information persisted in the underlying external block store,
+   * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore
+   * during system shutdown.
+   *
+   */
+  def shutdown()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
new file mode 100644
index 0000000..0bf7703
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import scala.util.control.NonFatal
+
+
+/**
+ * Stores BlockManager blocks on ExternalBlockStore.
+ * We capture any potential exception from underlying implementation
+ * and return with the expected failure value
+ */
+private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: String)
+  extends BlockStore(blockManager: BlockManager) with Logging {
+
+  lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager()
+
+  logInfo("ExternalBlockStore started")
+
+  override def getSize(blockId: BlockId): Long = {
+    try {
+      externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
+    } catch {
+      case NonFatal(t) =>
+        logError(s"error in getSize from $blockId", t)
+        0L
+    }
+  }
+
+  override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
+    putIntoExternalBlockStore(blockId, bytes, returnValues = true)
+  }
+
+  override def putArray(
+      blockId: BlockId,
+      values: Array[Any],
+      level: StorageLevel,
+      returnValues: Boolean): PutResult = {
+    putIterator(blockId, values.toIterator, level, returnValues)
+  }
+
+  override def putIterator(
+      blockId: BlockId,
+      values: Iterator[Any],
+      level: StorageLevel,
+      returnValues: Boolean): PutResult = {
+    logDebug(s"Attempting to write values for block $blockId")
+    val bytes = blockManager.dataSerialize(blockId, values)
+    putIntoExternalBlockStore(blockId, bytes, returnValues)
+  }
+
+  private def putIntoExternalBlockStore(
+      blockId: BlockId,
+      bytes: ByteBuffer,
+      returnValues: Boolean): PutResult = {
+    // So that we do not modify the input offsets !
+    // duplicate does not copy buffer, so inexpensive
+    val byteBuffer = bytes.duplicate()
+    byteBuffer.rewind()
+    logDebug(s"Attempting to put block $blockId into ExtBlk store")
+    // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
+    try {
+      val startTime = System.currentTimeMillis
+      if (externalBlockManager.isDefined) {
+        externalBlockManager.get.putBytes(blockId, bytes)
+        val finishTime = System.currentTimeMillis
+        logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
+          blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
+
+        if (returnValues) {
+          PutResult(bytes.limit(), Right(bytes.duplicate()))
+        } else {
+          PutResult(bytes.limit(), null)
+        }
+      } else {
+        logError(s"error in putBytes $blockId")
+        PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+      }
+    } catch {
+      case NonFatal(t) =>
+        logError(s"error in putBytes $blockId", t)
+        PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+    }
+  }
+
+  // We assume the block is removed even if exception thrown
+  override def remove(blockId: BlockId): Boolean = {
+    try {
+      externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
+    } catch {
+      case NonFatal(t) =>
+        logError(s"error in removing $blockId", t)
+        true
+    }
+  }
+
+  override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+  }
+
+  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+    try {
+      externalBlockManager.flatMap(_.getBytes(blockId))
+    } catch {
+      case NonFatal(t) =>
+        logError(s"error in getBytes from $blockId", t)
+        None
+    }
+  }
+
+  override def contains(blockId: BlockId): Boolean = {
+    try {
+      val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
+      if (!ret) {
+        logInfo(s"remove block $blockId")
+        blockManager.removeBlock(blockId, true)
+      }
+      ret
+    } catch {
+      case NonFatal(t) =>
+        logError(s"error in getBytes from $blockId", t)
+        false
+    }
+  }
+
+  private def addShutdownHook() {
+    Runtime.getRuntime.addShutdownHook(new Thread("ExternalBlockStore shutdown hook") {
+      override def run(): Unit = Utils.logUncaughtExceptions {
+        logDebug("Shutdown hook called")
+        externalBlockManager.map(_.shutdown())
+      }
+    })
+  }
+
+  // Create concrete block manager and fall back to Tachyon by default for backward compatibility.
+  private def createBlkManager(): Option[ExternalBlockManager] = {
+    val clsName = blockManager.conf.getOption(ExternalBlockStore.BLOCK_MANAGER_NAME)
+      .getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
+
+    try {
+      val instance = Class.forName(clsName)
+        .newInstance()
+        .asInstanceOf[ExternalBlockManager]
+      instance.init(blockManager, executorId)
+      addShutdownHook();
+      Some(instance)
+    } catch {
+      case NonFatal(t) =>
+        logError("Cannot initialize external block store", t)
+        None
+    }
+  }
+}
+
+private[spark] object ExternalBlockStore extends Logging {
+  val MAX_DIR_CREATION_ATTEMPTS = 10
+  val SUB_DIRS_PER_DIR = "64"
+  val BASE_DIR = "spark.externalBlockStore.baseDir"
+  val FOLD_NAME = "spark.externalBlockStore.folderName"
+  val MASTER_URL = "spark.externalBlockStore.url"
+  val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager"
+  val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.TachyonBlockManager"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 034525b..ad53a3e 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -32,16 +32,17 @@ class RDDInfo(
   var numCachedPartitions = 0
   var memSize = 0L
   var diskSize = 0L
-  var tachyonSize = 0L
+  var externalBlockStoreSize = 0L
 
-  def isCached: Boolean = (memSize + diskSize + tachyonSize > 0) && numCachedPartitions > 0
+  def isCached: Boolean =
+    (memSize + diskSize + externalBlockStoreSize > 0) && numCachedPartitions > 0
 
   override def toString: String = {
     import Utils.bytesToString
     ("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
-      "MemorySize: %s; TachyonSize: %s; DiskSize: %s").format(
+      "MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s").format(
         name, id, storageLevel.toString, numCachedPartitions, numPartitions,
-        bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
+        bytesToString(memSize), bytesToString(externalBlockStoreSize), bytesToString(diskSize))
   }
 
   override def compare(that: RDDInfo): Int = {

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 134abea..703bce3 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -26,9 +26,9 @@ import org.apache.spark.util.Utils
 /**
  * :: DeveloperApi ::
  * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
- * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to
- * keep the data in memory in a serialized format, and whether to replicate the RDD partitions on
- * multiple nodes.
+ * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
+ * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
+ * to replicate the RDD partitions on multiple nodes.
  *
  * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
  * for commonly useful storage levels. To create your own storage level object, use the
@@ -126,7 +126,7 @@ class StorageLevel private(
     var result = ""
     result += (if (useDisk) "Disk " else "")
     result += (if (useMemory) "Memory " else "")
-    result += (if (useOffHeap) "Tachyon " else "")
+    result += (if (useOffHeap) "ExternalBlockStore " else "")
     result += (if (deserialized) "Deserialized " else "Serialized ")
     result += s"${replication}x Replicated"
     result

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 2bd6b74..c4ac300 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -199,33 +199,34 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
     val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty)
     val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize
     val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize
-    val changeInTachyon = newBlockStatus.tachyonSize - oldBlockStatus.tachyonSize
+    val changeInExternalBlockStore =
+      newBlockStatus.externalBlockStoreSize - oldBlockStatus.externalBlockStoreSize
     val level = newBlockStatus.storageLevel
 
     // Compute new info from old info
-    val (oldMem, oldDisk, oldTachyon) = blockId match {
+    val (oldMem, oldDisk, oldExternalBlockStore) = blockId match {
       case RDDBlockId(rddId, _) =>
         _rddStorageInfo.get(rddId)
-          .map { case (mem, disk, tachyon, _) => (mem, disk, tachyon) }
+          .map { case (mem, disk, externalBlockStore, _) => (mem, disk, externalBlockStore) }
           .getOrElse((0L, 0L, 0L))
       case _ =>
         _nonRddStorageInfo
     }
     val newMem = math.max(oldMem + changeInMem, 0L)
     val newDisk = math.max(oldDisk + changeInDisk, 0L)
-    val newTachyon = math.max(oldTachyon + changeInTachyon, 0L)
+    val newExternalBlockStore = math.max(oldExternalBlockStore + changeInExternalBlockStore, 0L)
 
     // Set the correct info
     blockId match {
       case RDDBlockId(rddId, _) =>
         // If this RDD is no longer persisted, remove it
-        if (newMem + newDisk + newTachyon == 0) {
+        if (newMem + newDisk + newExternalBlockStore == 0) {
           _rddStorageInfo.remove(rddId)
         } else {
-          _rddStorageInfo(rddId) = (newMem, newDisk, newTachyon, level)
+          _rddStorageInfo(rddId) = (newMem, newDisk, newExternalBlockStore, level)
         }
       case _ =>
-        _nonRddStorageInfo = (newMem, newDisk, newTachyon)
+        _nonRddStorageInfo = (newMem, newDisk, newExternalBlockStore)
     }
   }
 
@@ -247,13 +248,13 @@ private[spark] object StorageUtils {
       val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
       val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
       val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
-      val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
+      val externalBlockStoreSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
 
       rddInfo.storageLevel = storageLevel
       rddInfo.numCachedPartitions = numCachedPartitions
       rddInfo.memSize = memSize
       rddInfo.diskSize = diskSize
-      rddInfo.tachyonSize = tachyonSize
+      rddInfo.externalBlockStoreSize = externalBlockStoreSize
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index 583f1fd..bdc6276 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -17,13 +17,16 @@
 
 package org.apache.spark.storage
 
+import java.io.IOException
+import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
 import java.util.{Date, Random}
 
+import com.google.common.io.ByteStreams
+import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
 import tachyon.TachyonURI
-import tachyon.client.{TachyonFile, TachyonFS}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, SparkConf, Logging}
 import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.util.Utils
 
@@ -32,32 +35,94 @@ import org.apache.spark.util.Utils
  * Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By
  * default, one block is mapped to one file with a name given by its BlockId.
  *
- * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
  */
-private[spark] class TachyonBlockManager(
-    blockManager: BlockManager,
-    rootDirs: String,
-    val master: String)
-  extends Logging {
+private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
 
-  val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
-
-  if (client == null) {
-    logError("Failed to connect to the Tachyon as the master address is not configured")
-    System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_INITIALIZE)
-  }
-
-  private val MAX_DIR_CREATION_ATTEMPTS = 10
-  private val subDirsPerTachyonDir =
-    blockManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt
+  var blockManager: BlockManager =_
+  var rootDirs: String = _
+  var master: String = _
+  var client: tachyon.client.TachyonFS = _
+  private var subDirsPerTachyonDir: Int = _
 
   // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
   // then, inside this directory, create multiple subdirectories that we will hash files into,
   // in order to avoid having really large inodes at the top level in Tachyon.
-  private val tachyonDirs: Array[TachyonFile] = createTachyonDirs()
-  private val subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
+  private var tachyonDirs: Array[TachyonFile] = _
+  private var subDirs: Array[Array[tachyon.client.TachyonFile]] = _
+
+
+  override def init(blockManager: BlockManager, executorId: String): Unit = {
+    this.blockManager = blockManager
+    val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
+    val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
+
+    rootDirs = s"$storeDir/$appFolderName/$executorId"
+    master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998")
+    client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
+    // original implementation call System.exit, we change it to run without extblkstore support
+    if (client == null) {
+      logError("Failed to connect to the Tachyon as the master address is not configured")
+      throw new IOException("Failed to connect to the Tachyon as the master " +
+        "address is not configured")
+    }
+    subDirsPerTachyonDir = blockManager.conf.get("spark.externalBlockStore.subDirectories",
+      ExternalBlockStore.SUB_DIRS_PER_DIR).toInt
+
+    // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
+    // then, inside this directory, create multiple subdirectories that we will hash files into,
+    // in order to avoid having really large inodes at the top level in Tachyon.
+    tachyonDirs = createTachyonDirs()
+    subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
+    tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
+  }
+
+  override def toString: String = {"ExternalBlockStore-Tachyon"}
+
+  override def removeBlock(blockId: BlockId): Boolean = {
+    val file = getFile(blockId)
+    if (fileExists(file)) {
+      removeFile(file)
+    } else {
+      false
+    }
+  }
+
+  override def blockExists(blockId: BlockId): Boolean = {
+    val file = getFile(blockId)
+    fileExists(file)
+  }
 
-  addShutdownHook()
+  override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
+    val file = getFile(blockId)
+    val os = file.getOutStream(WriteType.TRY_CACHE)
+    os.write(bytes.array())
+    os.close()
+  }
+
+  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+    val file = getFile(blockId)
+    if (file == null || file.getLocationHosts.size == 0) {
+      return None
+    }
+    val is = file.getInStream(ReadType.CACHE)
+    assert (is != null)
+    try {
+      val size = file.length
+      val bs = new Array[Byte](size.asInstanceOf[Int])
+      ByteStreams.readFully(is, bs)
+      Some(ByteBuffer.wrap(bs))
+    } catch {
+      case ioe: IOException =>
+        logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
+        None
+    } finally {
+      is.close()
+    }
+  }
+
+  override def getSize(blockId: BlockId): Long = {
+    getFile(blockId.name).length
+  }
 
   def removeFile(file: TachyonFile): Boolean = {
     client.delete(new TachyonURI(file.getPath()), false)
@@ -109,7 +174,7 @@ private[spark] class TachyonBlockManager(
       var tachyonDirId: String = null
       var tries = 0
       val rand = new Random()
-      while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
+      while (!foundLocalDir && tries < ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS) {
         tries += 1
         try {
           tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
@@ -124,30 +189,27 @@ private[spark] class TachyonBlockManager(
         }
       }
       if (!foundLocalDir) {
-        logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
-          rootDir)
-        System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
+        logError("Failed " + ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS
+          + " attempts to create tachyon dir in " + rootDir)
+        System.exit(ExecutorExitCode.EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR)
       }
       logInfo("Created tachyon directory at " + tachyonDir)
       tachyonDir
     }
   }
 
-  private def addShutdownHook() {
-    tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
-    Utils.addShutdownHook { () =>
-      logDebug("Shutdown hook called")
-      tachyonDirs.foreach { tachyonDir =>
-        try {
-          if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
-            Utils.deleteRecursively(tachyonDir, client)
-          }
-        } catch {
-          case e: Exception =>
-            logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
+  override def shutdown() {
+    logDebug("Shutdown hook called")
+    tachyonDirs.foreach { tachyonDir =>
+      try {
+        if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+          Utils.deleteRecursively(tachyonDir, client)
         }
+      } catch {
+        case e: Exception =>
+          logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
       }
-      client.close()
     }
+    client.close()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
deleted file mode 100644
index 65fa817..0000000
--- a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import tachyon.client.TachyonFile
-
-/**
- * References a particular segment of a file (potentially the entire file), based off an offset and
- * a length.
- */
-private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) {
-  override def toString: String = {
-    "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
deleted file mode 100644
index 233d1e2..0000000
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.io.IOException
-import java.nio.ByteBuffer
-
-import com.google.common.io.ByteStreams
-import tachyon.client.{ReadType, WriteType}
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-
-/**
- * Stores BlockManager blocks on Tachyon.
- */
-private[spark] class TachyonStore(
-    blockManager: BlockManager,
-    tachyonManager: TachyonBlockManager)
-  extends BlockStore(blockManager: BlockManager) with Logging {
-
-  logInfo("TachyonStore started")
-
-  override def getSize(blockId: BlockId): Long = {
-    tachyonManager.getFile(blockId.name).length
-  }
-
-  override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
-    putIntoTachyonStore(blockId, bytes, returnValues = true)
-  }
-
-  override def putArray(
-      blockId: BlockId,
-      values: Array[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-    putIterator(blockId, values.toIterator, level, returnValues)
-  }
-
-  override def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-    logDebug(s"Attempting to write values for block $blockId")
-    val bytes = blockManager.dataSerialize(blockId, values)
-    putIntoTachyonStore(blockId, bytes, returnValues)
-  }
-
-  private def putIntoTachyonStore(
-      blockId: BlockId,
-      bytes: ByteBuffer,
-      returnValues: Boolean): PutResult = {
-    // So that we do not modify the input offsets !
-    // duplicate does not copy buffer, so inexpensive
-    val byteBuffer = bytes.duplicate()
-    byteBuffer.rewind()
-    logDebug(s"Attempting to put block $blockId into Tachyon")
-    val startTime = System.currentTimeMillis
-    val file = tachyonManager.getFile(blockId)
-    val os = file.getOutStream(WriteType.TRY_CACHE)
-    os.write(byteBuffer.array())
-    os.close()
-    val finishTime = System.currentTimeMillis
-    logDebug("Block %s stored as %s file in Tachyon in %d ms".format(
-      blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
-
-    if (returnValues) {
-      PutResult(bytes.limit(), Right(bytes.duplicate()))
-    } else {
-      PutResult(bytes.limit(), null)
-    }
-  }
-
-  override def remove(blockId: BlockId): Boolean = {
-    val file = tachyonManager.getFile(blockId)
-    if (tachyonManager.fileExists(file)) {
-      tachyonManager.removeFile(file)
-    } else {
-      false
-    }
-  }
-
-  override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
-  }
-
-  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    val file = tachyonManager.getFile(blockId)
-    if (file == null || file.getLocationHosts.size == 0) {
-      return None
-    }
-    val is = file.getInStream(ReadType.CACHE)
-    assert (is != null)
-    try {
-      val size = file.length
-      val bs = new Array[Byte](size.asInstanceOf[Int])
-      ByteStreams.readFully(is, bs)
-      Some(ByteBuffer.wrap(bs))
-    } catch {
-      case ioe: IOException =>
-        logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
-        None
-    } finally {
-      is.close()
-    }
-  }
-
-  override def contains(blockId: BlockId): Boolean = {
-    val file = tachyonManager.getFile(blockId)
-    tachyonManager.fileExists(file)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 6ced605..59dc6b5 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -42,7 +42,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
     "Cached Partitions",
     "Fraction Cached",
     "Size in Memory",
-    "Size in Tachyon",
+    "Size in ExternalBlockStore",
     "Size on Disk")
 
   /** Render an HTML row representing an RDD */
@@ -59,7 +59,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
       <td>{rdd.numCachedPartitions}</td>
       <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
       <td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
-      <td sorttable_customkey={rdd.tachyonSize.toString}>{Utils.bytesToString(rdd.tachyonSize)}</td>
+      <td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td>
       <td sorttable_customkey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td>
     </tr>
     // scalastyle:on

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 474f79f..44d2749 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -373,14 +373,14 @@ private[spark] object JsonProtocol {
     ("Number of Partitions" -> rddInfo.numPartitions) ~
     ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
     ("Memory Size" -> rddInfo.memSize) ~
-    ("Tachyon Size" -> rddInfo.tachyonSize) ~
+    ("ExternalBlockStore Size" -> rddInfo.externalBlockStoreSize) ~
     ("Disk Size" -> rddInfo.diskSize)
   }
 
   def storageLevelToJson(storageLevel: StorageLevel): JValue = {
     ("Use Disk" -> storageLevel.useDisk) ~
     ("Use Memory" -> storageLevel.useMemory) ~
-    ("Use Tachyon" -> storageLevel.useOffHeap) ~
+    ("Use ExternalBlockStore" -> storageLevel.useOffHeap) ~
     ("Deserialized" -> storageLevel.deserialized) ~
     ("Replication" -> storageLevel.replication)
   }
@@ -389,7 +389,7 @@ private[spark] object JsonProtocol {
     val storageLevel = storageLevelToJson(blockStatus.storageLevel)
     ("Storage Level" -> storageLevel) ~
     ("Memory Size" -> blockStatus.memSize) ~
-    ("Tachyon Size" -> blockStatus.tachyonSize) ~
+    ("ExternalBlockStore Size" -> blockStatus.externalBlockStoreSize) ~
     ("Disk Size" -> blockStatus.diskSize)
   }
 
@@ -787,13 +787,15 @@ private[spark] object JsonProtocol {
     val numPartitions = (json \ "Number of Partitions").extract[Int]
     val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
     val memSize = (json \ "Memory Size").extract[Long]
-    val tachyonSize = (json \ "Tachyon Size").extract[Long]
+    // fallback to tachyon for backward compatability
+    val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
+      .getOrElse(json \ "Tachyon Size").extract[Long]
     val diskSize = (json \ "Disk Size").extract[Long]
 
     val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel)
     rddInfo.numCachedPartitions = numCachedPartitions
     rddInfo.memSize = memSize
-    rddInfo.tachyonSize = tachyonSize
+    rddInfo.externalBlockStoreSize = externalBlockStoreSize
     rddInfo.diskSize = diskSize
     rddInfo
   }
@@ -801,18 +803,22 @@ private[spark] object JsonProtocol {
   def storageLevelFromJson(json: JValue): StorageLevel = {
     val useDisk = (json \ "Use Disk").extract[Boolean]
     val useMemory = (json \ "Use Memory").extract[Boolean]
-    val useTachyon = (json \ "Use Tachyon").extract[Boolean]
+    // fallback to tachyon for backward compatability
+    val useExternalBlockStore = (json \ "Use ExternalBlockStore").toSome
+      .getOrElse(json \ "Use Tachyon").extract[Boolean]
     val deserialized = (json \ "Deserialized").extract[Boolean]
     val replication = (json \ "Replication").extract[Int]
-    StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication)
+    StorageLevel(useDisk, useMemory, useExternalBlockStore, deserialized, replication)
   }
 
   def blockStatusFromJson(json: JValue): BlockStatus = {
     val storageLevel = storageLevelFromJson(json \ "Storage Level")
     val memorySize = (json \ "Memory Size").extract[Long]
     val diskSize = (json \ "Disk Size").extract[Long]
-    val tachyonSize = (json \ "Tachyon Size").extract[Long]
-    BlockStatus(storageLevel, memorySize, diskSize, tachyonSize)
+    // fallback to tachyon for backward compatability
+    val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
+      .getOrElse(json \ "Tachyon Size").extract[Long]
+    BlockStatus(storageLevel, memorySize, diskSize, externalBlockStoreSize)
   }
 
   def executorInfoFromJson(json: JValue): ExecutorInfo = {

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f5b410f..151955e 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -526,6 +526,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
   test("tachyon storage") {
     // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar.
     val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
+    conf.set(ExternalBlockStore.BLOCK_MANAGER_NAME, ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
     if (tachyonUnitTestEnabled) {
       store = makeBlockManager(1200)
       val a1 = new Array[Byte](400)

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index ef5c55f..ee1071c 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -240,11 +240,11 @@ class StorageSuite extends FunSuite {
     assert(status.rddBlocksById(1000).size === status.numRddBlocksById(1000))
   }
 
-  test("storage status memUsed, diskUsed, tachyonUsed") {
+  test("storage status memUsed, diskUsed, externalBlockStoreUsed") {
     val status = storageStatus2
     def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
     def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
-    def actualOffHeapUsed: Long = status.blocks.values.map(_.tachyonSize).sum
+    def actualOffHeapUsed: Long = status.blocks.values.map(_.externalBlockStoreSize).sum
     assert(status.memUsed === actualMemUsed)
     assert(status.diskUsed === actualDiskUsed)
     assert(status.offHeapUsed === actualOffHeapUsed)
@@ -300,12 +300,12 @@ class StorageSuite extends FunSuite {
     assert(rddInfos(0).numCachedPartitions === 5)
     assert(rddInfos(0).memSize === 5L)
     assert(rddInfos(0).diskSize === 10L)
-    assert(rddInfos(0).tachyonSize === 0L)
+    assert(rddInfos(0).externalBlockStoreSize === 0L)
     assert(rddInfos(1).storageLevel === memAndDisk)
     assert(rddInfos(1).numCachedPartitions === 3)
     assert(rddInfos(1).memSize === 3L)
     assert(rddInfos(1).diskSize === 6L)
-    assert(rddInfos(1).tachyonSize === 0L)
+    assert(rddInfos(1).externalBlockStoreSize === 0L)
   }
 
   test("StorageUtils.getRddBlockLocations") {

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 3744e47..0ea9ea4 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -133,12 +133,12 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
     assert(storageListener._rddInfoMap(0).memSize === 800L)
     assert(storageListener._rddInfoMap(0).diskSize === 400L)
-    assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
+    assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
     assert(storageListener._rddInfoMap(0).numCachedPartitions === 3)
     assert(storageListener._rddInfoMap(0).isCached)
     assert(storageListener._rddInfoMap(1).memSize === 0L)
     assert(storageListener._rddInfoMap(1).diskSize === 240L)
-    assert(storageListener._rddInfoMap(1).tachyonSize === 0L)
+    assert(storageListener._rddInfoMap(1).externalBlockStoreSize === 0L)
     assert(storageListener._rddInfoMap(1).numCachedPartitions === 1)
     assert(storageListener._rddInfoMap(1).isCached)
     assert(!storageListener._rddInfoMap(2).isCached)
@@ -155,7 +155,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
     assert(storageListener._rddInfoMap(0).memSize === 400L)
     assert(storageListener._rddInfoMap(0).diskSize === 400L)
-    assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
+    assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
     assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
     assert(storageListener._rddInfoMap(0).isCached)
     assert(!storageListener._rddInfoMap(1).isCached)

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a2be724..2d039cb 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -785,14 +785,14 @@ class JsonProtocolSuite extends FunSuite {
       |        "Storage Level": {
       |          "Use Disk": true,
       |          "Use Memory": true,
-      |          "Use Tachyon": false,
+      |          "Use ExternalBlockStore": false,
       |          "Deserialized": true,
       |          "Replication": 1
       |        },
       |        "Number of Partitions": 201,
       |        "Number of Cached Partitions": 301,
       |        "Memory Size": 401,
-      |        "Tachyon Size": 0,
+      |        "ExternalBlockStore Size": 0,
       |        "Disk Size": 501
       |      }
       |    ],
@@ -969,12 +969,12 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": false,
       |            "Replication": 2
       |          },
       |          "Memory Size": 0,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 0
       |        }
       |      }
@@ -1052,12 +1052,12 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": false,
       |            "Replication": 2
       |          },
       |          "Memory Size": 0,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 0
       |        }
       |      }
@@ -1135,12 +1135,12 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": false,
       |            "Replication": 2
       |          },
       |          "Memory Size": 0,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 0
       |        }
       |      }
@@ -1168,14 +1168,14 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 200,
       |          "Number of Cached Partitions": 300,
       |          "Memory Size": 400,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 500
       |        }
       |      ],
@@ -1207,14 +1207,14 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 400,
       |          "Number of Cached Partitions": 600,
       |          "Memory Size": 800,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 1000
       |        },
       |        {
@@ -1223,14 +1223,14 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 401,
       |          "Number of Cached Partitions": 601,
       |          "Memory Size": 801,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 1001
       |        }
       |      ],
@@ -1262,14 +1262,14 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 600,
       |          "Number of Cached Partitions": 900,
       |          "Memory Size": 1200,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 1500
       |        },
       |        {
@@ -1278,14 +1278,14 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 601,
       |          "Number of Cached Partitions": 901,
       |          "Memory Size": 1201,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 1501
       |        },
       |        {
@@ -1294,14 +1294,14 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 602,
       |          "Number of Cached Partitions": 902,
       |          "Memory Size": 1202,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 1502
       |        }
       |      ],
@@ -1333,14 +1333,14 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 800,
       |          "Number of Cached Partitions": 1200,
       |          "Memory Size": 1600,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 2000
       |        },
       |        {
@@ -1349,14 +1349,14 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 801,
       |          "Number of Cached Partitions": 1201,
       |          "Memory Size": 1601,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 2001
       |        },
       |        {
@@ -1365,14 +1365,14 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 802,
       |          "Number of Cached Partitions": 1202,
       |          "Memory Size": 1602,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 2002
       |        },
       |        {
@@ -1381,14 +1381,14 @@ class JsonProtocolSuite extends FunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use Tachyon": false,
+      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 803,
       |          "Number of Cached Partitions": 1203,
       |          "Memory Size": 1603,
-      |          "Tachyon Size": 0,
+      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 2003
       |        }
       |      ],

http://git-wip-us.apache.org/repos/asf/spark/blob/36a7a680/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 72105fe..7239b25 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1,4 +1,4 @@
----
+--
 layout: global
 displayTitle: Spark Configuration
 title: Configuration
@@ -843,19 +843,27 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.tachyonStore.baseDir</code></td>
+  <td><code>spark.externalBlockStore.blockManager</code></td>
+  <td>org.apache.spark.storage.TachyonBlockManager</td>
+  <td>
+    Implementation of external block manager (file system) that store RDDs. The file system's URL is set by
+    <code>spark.externalBlockStore.url</code>.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.externalBlockStore.baseDir</code></td>
   <td>System.getProperty("java.io.tmpdir")</td>
   <td>
-    Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by
-    <code>spark.tachyonStore.url</code>. It can also be a comma-separated list of multiple
+    Directories of the external block store that store RDDs. The file system's URL is set by
+   <code>spark.externalBlockStore.url</code> It can also be a comma-separated list of multiple
     directories on Tachyon file system.
   </td>
 </tr>
 <tr>
-  <td><code>spark.tachyonStore.url</code></td>
-  <td>tachyon://localhost:19998</td>
+  <td><code>spark.externalBlockStore.url</code></td>
+  <td>tachyon://localhost:19998 for Tachyon</td>
   <td>
-    The URL of the underlying Tachyon file system in the TachyonStore.
+    The URL of the underlying external blocker file system in the external block store.
   </td>
 </tr>
 </table>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org