You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/01/15 21:03:53 UTC

[1/2] spark git commit: [SPARK-12667] Remove block manager's internal "external block store" API

Repository: spark
Updated Branches:
  refs/heads/master 5f83c6991 -> ad1503f92


http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
index 3dab15a..350c174 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.ui.storage
 
-import scala.xml.Utility
-
 import org.mockito.Mockito._
 
 import org.apache.spark.SparkFunSuite
@@ -64,26 +62,24 @@ class StoragePageSuite extends SparkFunSuite {
       "Cached Partitions",
       "Fraction Cached",
       "Size in Memory",
-      "Size in ExternalBlockStore",
       "Size on Disk")
     assert((xmlNodes \\ "th").map(_.text) === headers)
 
     assert((xmlNodes \\ "tr").size === 3)
     assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) ===
-      Seq("rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B", "0.0 B"))
+      Seq("rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B"))
     // Check the url
     assert(((xmlNodes \\ "tr")(0) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
       Some("http://localhost:4040/storage/rdd?id=1"))
 
     assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) ===
-      Seq("rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "0.0 B", "200.0 B"))
+      Seq("rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "200.0 B"))
     // Check the url
     assert(((xmlNodes \\ "tr")(1) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
       Some("http://localhost:4040/storage/rdd?id=2"))
 
     assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) ===
-      Seq("rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "0.0 B",
-        "500.0 B"))
+      Seq("rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "500.0 B"))
     // Check the url
     assert(((xmlNodes \\ "tr")(2) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
       Some("http://localhost:4040/storage/rdd?id=3"))
@@ -98,16 +94,14 @@ class StoragePageSuite extends SparkFunSuite {
       "localhost:1111",
       StorageLevel.MEMORY_ONLY,
       memSize = 100,
-      diskSize = 0,
-      externalBlockStoreSize = 0)
+      diskSize = 0)
     assert(("Memory", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(memoryBlock))
 
     val memorySerializedBlock = BlockUIData(StreamBlockId(0, 0),
       "localhost:1111",
       StorageLevel.MEMORY_ONLY_SER,
       memSize = 100,
-      diskSize = 0,
-      externalBlockStoreSize = 0)
+      diskSize = 0)
     assert(("Memory Serialized", 100) ===
       storagePage.streamBlockStorageLevelDescriptionAndSize(memorySerializedBlock))
 
@@ -115,18 +109,8 @@ class StoragePageSuite extends SparkFunSuite {
       "localhost:1111",
       StorageLevel.DISK_ONLY,
       memSize = 0,
-      diskSize = 100,
-      externalBlockStoreSize = 0)
+      diskSize = 100)
     assert(("Disk", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(diskBlock))
-
-    val externalBlock = BlockUIData(StreamBlockId(0, 0),
-      "localhost:1111",
-      StorageLevel.OFF_HEAP,
-      memSize = 0,
-      diskSize = 0,
-      externalBlockStoreSize = 100)
-    assert(("External", 100) ===
-      storagePage.streamBlockStorageLevelDescriptionAndSize(externalBlock))
   }
 
   test("receiverBlockTables") {
@@ -135,14 +119,12 @@ class StoragePageSuite extends SparkFunSuite {
         "localhost:10000",
         StorageLevel.MEMORY_ONLY,
         memSize = 100,
-        diskSize = 0,
-        externalBlockStoreSize = 0),
+        diskSize = 0),
       BlockUIData(StreamBlockId(1, 1),
         "localhost:10000",
         StorageLevel.DISK_ONLY,
         memSize = 0,
-        diskSize = 100,
-        externalBlockStoreSize = 0)
+        diskSize = 100)
     )
     val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", blocksForExecutor0)
 
@@ -151,20 +133,12 @@ class StoragePageSuite extends SparkFunSuite {
         "localhost:10001",
         StorageLevel.MEMORY_ONLY,
         memSize = 100,
-        diskSize = 0,
-        externalBlockStoreSize = 0),
-      BlockUIData(StreamBlockId(2, 2),
-        "localhost:10001",
-        StorageLevel.OFF_HEAP,
-        memSize = 0,
-        diskSize = 0,
-        externalBlockStoreSize = 200),
+        diskSize = 0),
       BlockUIData(StreamBlockId(1, 1),
         "localhost:10001",
         StorageLevel.MEMORY_ONLY_SER,
         memSize = 100,
-        diskSize = 0,
-        externalBlockStoreSize = 0)
+        diskSize = 0)
     )
     val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", blocksForExecutor1)
     val xmlNodes = storagePage.receiverBlockTables(Seq(executor0, executor1))
@@ -174,16 +148,15 @@ class StoragePageSuite extends SparkFunSuite {
       "Executor ID",
       "Address",
       "Total Size in Memory",
-      "Total Size in ExternalBlockStore",
       "Total Size on Disk",
       "Stream Blocks")
     assert((executorTable \\ "th").map(_.text) === executorHeaders)
 
     assert((executorTable \\ "tr").size === 2)
     assert(((executorTable \\ "tr")(0) \\ "td").map(_.text.trim) ===
-      Seq("0", "localhost:10000", "100.0 B", "0.0 B", "100.0 B", "2"))
+      Seq("0", "localhost:10000", "100.0 B", "100.0 B", "2"))
     assert(((executorTable \\ "tr")(1) \\ "td").map(_.text.trim) ===
-      Seq("1", "localhost:10001", "200.0 B", "200.0 B", "0.0 B", "3"))
+      Seq("1", "localhost:10001", "200.0 B", "0.0 B", "2"))
 
     val blockTable = (xmlNodes \\ "table")(1)
     val blockHeaders = Seq(
@@ -194,7 +167,7 @@ class StoragePageSuite extends SparkFunSuite {
       "Size")
     assert((blockTable \\ "th").map(_.text) === blockHeaders)
 
-    assert((blockTable \\ "tr").size === 5)
+    assert((blockTable \\ "tr").size === 4)
     assert(((blockTable \\ "tr")(0) \\ "td").map(_.text.trim) ===
       Seq("input-0-0", "2", "localhost:10000", "Memory", "100.0 B"))
     // Check "rowspan=2" for the first 2 columns
@@ -212,17 +185,10 @@ class StoragePageSuite extends SparkFunSuite {
 
     assert(((blockTable \\ "tr")(3) \\ "td").map(_.text.trim) ===
       Seq("localhost:10001", "Memory Serialized", "100.0 B"))
-
-    assert(((blockTable \\ "tr")(4) \\ "td").map(_.text.trim) ===
-      Seq("input-2-2", "1", "localhost:10001", "External", "200.0 B"))
-    // Check "rowspan=1" for the first 2 columns
-    assert(((blockTable \\ "tr")(4) \\ "td")(0).attribute("rowspan").map(_.text) === Some("1"))
-    assert(((blockTable \\ "tr")(4) \\ "td")(1).attribute("rowspan").map(_.text) === Some("1"))
   }
 
   test("empty receiverBlockTables") {
     assert(storagePage.receiverBlockTables(Seq.empty).isEmpty)
-
     val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", Seq.empty)
     val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty)
     assert(storagePage.receiverBlockTables(Seq(executor0, executor1)).isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 4b838a8..5ac922c 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -128,20 +128,17 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
     // Task end with a few new persisted blocks, some from the same RDD
     val metrics1 = new TaskMetrics
     metrics1.updatedBlocks = Some(Seq(
-      (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L, 0L)),
-      (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L, 0L)),
-      (RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)),
-      (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L))
+      (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L)),
+      (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L)),
+      (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L))
     ))
     bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
-    assert(storageListener._rddInfoMap(0).memSize === 800L)
+    assert(storageListener._rddInfoMap(0).memSize === 400L)
     assert(storageListener._rddInfoMap(0).diskSize === 400L)
-    assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
-    assert(storageListener._rddInfoMap(0).numCachedPartitions === 3)
+    assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
     assert(storageListener._rddInfoMap(0).isCached)
     assert(storageListener._rddInfoMap(1).memSize === 0L)
     assert(storageListener._rddInfoMap(1).diskSize === 240L)
-    assert(storageListener._rddInfoMap(1).externalBlockStoreSize === 0L)
     assert(storageListener._rddInfoMap(1).numCachedPartitions === 1)
     assert(storageListener._rddInfoMap(1).isCached)
     assert(!storageListener._rddInfoMap(2).isCached)
@@ -150,16 +147,15 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
     // Task end with a few dropped blocks
     val metrics2 = new TaskMetrics
     metrics2.updatedBlocks = Some(Seq(
-      (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L, 0L)),
-      (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L, 0L)),
-      (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist
-      (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist
+      (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L)),
+      (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L)),
+      (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L)), // doesn't actually exist
+      (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L)) // doesn't actually exist
     ))
     bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
-    assert(storageListener._rddInfoMap(0).memSize === 400L)
+    assert(storageListener._rddInfoMap(0).memSize === 0L)
     assert(storageListener._rddInfoMap(0).diskSize === 400L)
-    assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
-    assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
+    assert(storageListener._rddInfoMap(0).numCachedPartitions === 1)
     assert(storageListener._rddInfoMap(0).isCached)
     assert(!storageListener._rddInfoMap(1).isCached)
     assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
@@ -175,8 +171,8 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
     val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details")
     val taskMetrics0 = new TaskMetrics
     val taskMetrics1 = new TaskMetrics
-    val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L, 0L))
-    val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L))
+    val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L))
+    val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L))
     taskMetrics0.updatedBlocks = Some(Seq(block0))
     taskMetrics1.updatedBlocks = Some(Seq(block1))
     bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6566400..068e839 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -801,7 +801,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     }
     // Make at most 6 blocks
     t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
-      (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i))
+      (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i))
     }.toSeq)
     t
   }
@@ -867,14 +867,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "Storage Level": {
       |          "Use Disk": true,
       |          "Use Memory": true,
-      |          "Use ExternalBlockStore": false,
       |          "Deserialized": true,
       |          "Replication": 1
       |        },
       |        "Number of Partitions": 201,
       |        "Number of Cached Partitions": 301,
       |        "Memory Size": 401,
-      |        "ExternalBlockStore Size": 0,
       |        "Disk Size": 501
       |      }
       |    ],
@@ -1063,12 +1061,10 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": false,
       |            "Replication": 2
       |          },
       |          "Memory Size": 0,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 0
       |        }
       |      }
@@ -1149,12 +1145,10 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": false,
       |            "Replication": 2
       |          },
       |          "Memory Size": 0,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 0
       |        }
       |      }
@@ -1235,12 +1229,10 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": false,
       |            "Replication": 2
       |          },
       |          "Memory Size": 0,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 0
       |        }
       |      }
@@ -1270,14 +1262,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 200,
       |          "Number of Cached Partitions": 300,
       |          "Memory Size": 400,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 500
       |        }
       |      ],
@@ -1314,14 +1304,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 400,
       |          "Number of Cached Partitions": 600,
       |          "Memory Size": 800,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 1000
       |        },
       |        {
@@ -1332,14 +1320,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 401,
       |          "Number of Cached Partitions": 601,
       |          "Memory Size": 801,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 1001
       |        }
       |      ],
@@ -1376,14 +1362,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 600,
       |          "Number of Cached Partitions": 900,
       |          "Memory Size": 1200,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 1500
       |        },
       |        {
@@ -1394,14 +1378,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 601,
       |          "Number of Cached Partitions": 901,
       |          "Memory Size": 1201,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 1501
       |        },
       |        {
@@ -1412,14 +1394,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 602,
       |          "Number of Cached Partitions": 902,
       |          "Memory Size": 1202,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 1502
       |        }
       |      ],
@@ -1456,14 +1436,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 800,
       |          "Number of Cached Partitions": 1200,
       |          "Memory Size": 1600,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 2000
       |        },
       |        {
@@ -1474,14 +1452,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 801,
       |          "Number of Cached Partitions": 1201,
       |          "Memory Size": 1601,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 2001
       |        },
       |        {
@@ -1492,14 +1468,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 802,
       |          "Number of Cached Partitions": 1202,
       |          "Memory Size": 1602,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 2002
       |        },
       |        {
@@ -1510,14 +1484,12 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
-      |            "Use ExternalBlockStore": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
       |          "Number of Partitions": 803,
       |          "Number of Cached Partitions": 1203,
       |          "Memory Size": 1603,
-      |          "ExternalBlockStore Size": 0,
       |          "Disk Size": 2003
       |        }
       |      ],
@@ -1723,12 +1695,10 @@ class JsonProtocolSuite extends SparkFunSuite {
      |          "Storage Level": {
      |            "Use Disk": true,
      |            "Use Memory": true,
-     |            "Use ExternalBlockStore": false,
      |            "Deserialized": false,
      |            "Replication": 2
      |          },
      |          "Memory Size": 0,
-     |          "ExternalBlockStore Size": 0,
      |          "Disk Size": 0
      |        }
      |      }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/dev/deps/spark-deps-hadoop-2.2
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index fb2e91e..0760529 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -35,7 +35,7 @@ commons-configuration-1.6.jar
 commons-dbcp-1.4.jar
 commons-digester-1.8.jar
 commons-httpclient-3.1.jar
-commons-io-2.4.jar
+commons-io-2.1.jar
 commons-lang-2.6.jar
 commons-lang3-3.3.2.jar
 commons-logging-1.1.3.jar
@@ -179,10 +179,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-tachyon-client-0.8.2.jar
-tachyon-underfs-hdfs-0.8.2.jar
-tachyon-underfs-local-0.8.2.jar
-tachyon-underfs-s3-0.8.2.jar
 uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/dev/deps/spark-deps-hadoop-2.3
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 59e4d4f..191f2a0 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -170,10 +170,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-tachyon-client-0.8.2.jar
-tachyon-underfs-hdfs-0.8.2.jar
-tachyon-underfs-local-0.8.2.jar
-tachyon-underfs-s3-0.8.2.jar
 uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/dev/deps/spark-deps-hadoop-2.4
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index e4395c8..9134e99 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -171,10 +171,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-tachyon-client-0.8.2.jar
-tachyon-underfs-hdfs-0.8.2.jar
-tachyon-underfs-local-0.8.2.jar
-tachyon-underfs-s3-0.8.2.jar
 uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 89fd15d..8c45832 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -177,10 +177,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-tachyon-client-0.8.2.jar
-tachyon-underfs-hdfs-0.8.2.jar
-tachyon-underfs-local-0.8.2.jar
-tachyon-underfs-s3-0.8.2.jar
 uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
deleted file mode 100644
index 8b739c9..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples
-
-import java.util.Random
-
-import scala.math.exp
-
-import breeze.linalg.{DenseVector, Vector}
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark._
-import org.apache.spark.storage.StorageLevel
-
-/**
- * Logistic regression based classification.
- * This example uses Tachyon to persist rdds during computation.
- *
- * This is an example implementation for learning how to use Spark. For more conventional use,
- * please refer to either org.apache.spark.mllib.classification.LogisticRegressionWithSGD or
- * org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS based on your needs.
- */
-object SparkTachyonHdfsLR {
-  val D = 10   // Numer of dimensions
-  val rand = new Random(42)
-
-  def showWarning() {
-    System.err.println(
-      """WARN: This is a naive implementation of Logistic Regression and is given as an example!
-        |Please use either org.apache.spark.mllib.classification.LogisticRegressionWithSGD or
-        |org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
-        |for more conventional use.
-      """.stripMargin)
-  }
-
-  case class DataPoint(x: Vector[Double], y: Double)
-
-  def parsePoint(line: String): DataPoint = {
-    val tok = new java.util.StringTokenizer(line, " ")
-    var y = tok.nextToken.toDouble
-    var x = new Array[Double](D)
-    var i = 0
-    while (i < D) {
-      x(i) = tok.nextToken.toDouble; i += 1
-    }
-    DataPoint(new DenseVector(x), y)
-  }
-
-  def main(args: Array[String]) {
-
-    showWarning()
-
-    val inputPath = args(0)
-    val sparkConf = new SparkConf().setAppName("SparkTachyonHdfsLR")
-    val conf = new Configuration()
-    val sc = new SparkContext(sparkConf)
-    val lines = sc.textFile(inputPath)
-    val points = lines.map(parsePoint).persist(StorageLevel.OFF_HEAP)
-    val ITERATIONS = args(1).toInt
-
-    // Initialize w to a random value
-    var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
-    println("Initial w: " + w)
-
-    for (i <- 1 to ITERATIONS) {
-      println("On iteration " + i)
-      val gradient = points.map { p =>
-        p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
-      }.reduce(_ + _)
-      w -= gradient
-    }
-
-    println("Final w: " + w)
-    sc.stop()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
deleted file mode 100644
index e46ac65..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples
-
-import scala.math.random
-
-import org.apache.spark._
-import org.apache.spark.storage.StorageLevel
-
-/**
- *  Computes an approximation to pi
- *  This example uses Tachyon to persist rdds during computation.
- */
-object SparkTachyonPi {
-  def main(args: Array[String]) {
-    val sparkConf = new SparkConf().setAppName("SparkTachyonPi")
-    val spark = new SparkContext(sparkConf)
-
-    val slices = if (args.length > 0) args(0).toInt else 2
-    val n = 100000 * slices
-
-    val rdd = spark.parallelize(1 to n, slices)
-    rdd.persist(StorageLevel.OFF_HEAP)
-    val count = rdd.map { i =>
-      val x = random * 2 - 1
-      val y = random * 2 - 1
-      if (x * x + y * y < 1) 1 else 0
-    }.reduce(_ + _)
-    println("Pi is roughly " + 4.0 * count / n)
-
-    spark.stop()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4206d1f..ccd3c34 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -120,7 +120,11 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.toArray"),
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.defaultMinSplits"),
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.clearJars"),
-        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.clearFiles")
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.clearFiles"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.externalBlockStoreFolderName"),
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockStore$"),
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockManager"),
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockStore")
       ) ++
       // SPARK-12665 Remove deprecated and unused classes
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-12667] Remove block manager's internal "external block store" API

Posted by jo...@apache.org.
[SPARK-12667] Remove block manager's internal "external block store" API

This pull request removes the external block store API. This is rarely used, and the file system interface is actually a better, more standard way to interact with external storage systems.

There are some other things to remove also, as pointed out by JoshRosen. We will do those as follow-up pull requests.

Author: Reynold Xin <rx...@databricks.com>

Closes #10752 from rxin/remove-offheap.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad1503f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad1503f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad1503f9

Branch: refs/heads/master
Commit: ad1503f92e1f6e960a24f9f5d36b1735d1f5073a
Parents: 5f83c69
Author: Reynold Xin <rx...@databricks.com>
Authored: Fri Jan 15 12:03:28 2016 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Fri Jan 15 12:03:28 2016 -0800

----------------------------------------------------------------------
 core/pom.xml                                    |  27 --
 .../scala/org/apache/spark/SparkContext.scala   |   6 -
 .../spark/rdd/LocalRDDCheckpointData.scala      |   8 +-
 .../org/apache/spark/status/api/v1/api.scala    |   2 -
 .../org/apache/spark/storage/BlockManager.scala |  55 +---
 .../spark/storage/BlockManagerMaster.scala      |   6 +-
 .../storage/BlockManagerMasterEndpoint.scala    |  41 +--
 .../spark/storage/BlockManagerMessages.scala    |   7 +-
 .../spark/storage/BlockStatusListener.scala     |   9 +-
 .../apache/spark/storage/BlockUpdatedInfo.scala |   6 +-
 .../spark/storage/ExternalBlockManager.scala    | 122 -------
 .../spark/storage/ExternalBlockStore.scala      | 211 ------------
 .../org/apache/spark/storage/RDDInfo.scala      |   9 +-
 .../org/apache/spark/storage/StorageLevel.scala |   4 +-
 .../org/apache/spark/storage/StorageUtils.scala |  31 +-
 .../spark/storage/TachyonBlockManager.scala     | 324 -------------------
 .../apache/spark/ui/storage/StoragePage.scala   |   8 -
 .../org/apache/spark/util/JsonProtocol.scala    |  17 +-
 .../spark/memory/MemoryManagerSuite.scala       |   4 +-
 .../apache/spark/rdd/LocalCheckpointSuite.scala |   6 -
 .../spark/storage/BlockManagerSuite.scala       |  26 +-
 .../storage/BlockStatusListenerSuite.scala      |  18 +-
 .../storage/StorageStatusListenerSuite.scala    |  18 +-
 .../org/apache/spark/storage/StorageSuite.scala |  97 +++---
 .../spark/ui/storage/StoragePageSuite.scala     |  60 +---
 .../spark/ui/storage/StorageTabSuite.scala      |  30 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |  32 +-
 dev/deps/spark-deps-hadoop-2.2                  |   6 +-
 dev/deps/spark-deps-hadoop-2.3                  |   4 -
 dev/deps/spark-deps-hadoop-2.4                  |   4 -
 dev/deps/spark-deps-hadoop-2.6                  |   4 -
 .../spark/examples/SparkTachyonHdfsLR.scala     |  93 ------
 .../apache/spark/examples/SparkTachyonPi.scala  |  50 ---
 project/MimaExcludes.scala                      |   6 +-
 34 files changed, 139 insertions(+), 1212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 3bec5de..2071a58 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -268,33 +268,6 @@
       <version>${oro.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.tachyonproject</groupId>
-      <artifactId>tachyon-client</artifactId>
-      <version>0.8.2</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.curator</groupId>
-          <artifactId>curator-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.curator</groupId>
-          <artifactId>curator-framework</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.curator</groupId>
-          <artifactId>curator-recipes</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.tachyonproject</groupId>
-          <artifactId>tachyon-underfs-glusterfs</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
       <groupId>org.seleniumhq.selenium</groupId>
       <artifactId>selenium-java</artifactId>
       <exclusions>

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 98075ce..77acb70 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -243,10 +243,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private[spark] def eventLogDir: Option[URI] = _eventLogDir
   private[spark] def eventLogCodec: Option[String] = _eventLogCodec
 
-  // Generate the random name for a temp folder in external block store.
-  // Add a timestamp as the suffix here to make it more safe
-  val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
-
   def isLocal: Boolean = (master == "local" || master.startsWith("local["))
 
   /**
@@ -423,8 +419,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       }
     }
 
-    _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
-
     if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
 
     // "_jobProgressListener" should be set up before creating SparkEnv because when creating

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
index c115e0f..dad90fc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.{Logging, SparkEnv, SparkException, TaskContext}
+import org.apache.spark.{Logging, SparkEnv, TaskContext}
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.Utils
 
@@ -72,12 +72,6 @@ private[spark] object LocalRDDCheckpointData {
    * This method is idempotent.
    */
   def transformStorageLevel(level: StorageLevel): StorageLevel = {
-    // If this RDD is to be cached off-heap, fail fast since we cannot provide any
-    // correctness guarantees about subsequent computations after the first one
-    if (level.useOffHeap) {
-      throw new SparkException("Local checkpointing is not compatible with off-heap caching.")
-    }
-
     StorageLevel(useDisk = true, level.useMemory, level.deserialized, level.replication)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 9cd52d6..fe37211 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -85,8 +85,6 @@ class JobData private[spark](
     val numSkippedStages: Int,
     val numFailedStages: Int)
 
-// Q: should Tachyon size go in here as well?  currently the UI only shows it on the overall storage
-// page ... does anybody pay attention to it?
 class RDDStorageInfo private[spark](
     val id: Int,
     val name: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 4479e68..e49d79b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -83,13 +83,8 @@ private[spark] class BlockManager(
     ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
 
   // Actual storage of where blocks are kept
-  private var externalBlockStoreInitialized = false
   private[spark] val memoryStore = new MemoryStore(this, memoryManager)
   private[spark] val diskStore = new DiskStore(this, diskBlockManager)
-  private[spark] lazy val externalBlockStore: ExternalBlockStore = {
-    externalBlockStoreInitialized = true
-    new ExternalBlockStore(this, executorId)
-  }
   memoryManager.setMemoryStore(memoryStore)
 
   // Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
@@ -313,8 +308,7 @@ private[spark] class BlockManager(
     blockInfo.asScala.get(blockId).map { info =>
       val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
       val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
-      // Assume that block is not in external block store
-      BlockStatus(info.level, memSize, diskSize, 0L)
+      BlockStatus(info.level, memSize = memSize, diskSize = diskSize)
     }
   }
 
@@ -363,10 +357,8 @@ private[spark] class BlockManager(
     if (info.tellMaster) {
       val storageLevel = status.storageLevel
       val inMemSize = Math.max(status.memSize, droppedMemorySize)
-      val inExternalBlockStoreSize = status.externalBlockStoreSize
       val onDiskSize = status.diskSize
-      master.updateBlockInfo(
-        blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
+      master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
     } else {
       true
     }
@@ -381,20 +373,17 @@ private[spark] class BlockManager(
     info.synchronized {
       info.level match {
         case null =>
-          BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
+          BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
         case level =>
           val inMem = level.useMemory && memoryStore.contains(blockId)
-          val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
           val onDisk = level.useDisk && diskStore.contains(blockId)
           val deserialized = if (inMem) level.deserialized else false
-          val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
+          val replication = if (inMem  || onDisk) level.replication else 1
           val storageLevel =
-            StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
+            StorageLevel(onDisk, inMem, deserialized, replication)
           val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
-          val externalBlockStoreSize =
-            if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
           val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
-          BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
+          BlockStatus(storageLevel, memSize, diskSize)
       }
     }
   }
@@ -475,25 +464,6 @@ private[spark] class BlockManager(
           }
         }
 
-        // Look for the block in external block store
-        if (level.useOffHeap) {
-          logDebug(s"Getting block $blockId from ExternalBlockStore")
-          if (externalBlockStore.contains(blockId)) {
-            val result = if (asBlockResult) {
-              externalBlockStore.getValues(blockId)
-                .map(new BlockResult(_, DataReadMethod.Memory, info.size))
-            } else {
-              externalBlockStore.getBytes(blockId)
-            }
-            result match {
-              case Some(values) =>
-                return result
-              case None =>
-                logDebug(s"Block $blockId not found in ExternalBlockStore")
-            }
-          }
-        }
-
         // Look for block on disk, potentially storing it back in memory if required
         if (level.useDisk) {
           logDebug(s"Getting block $blockId from disk")
@@ -786,9 +756,6 @@ private[spark] class BlockManager(
             // Put it in memory first, even if it also has useDisk set to true;
             // We will drop it to disk later if the memory store can't hold it.
             (true, memoryStore)
-          } else if (putLevel.useOffHeap) {
-            // Use external block store
-            (false, externalBlockStore)
           } else if (putLevel.useDisk) {
             // Don't get back the bytes from put unless we replicate them
             (putLevel.replication > 1, diskStore)
@@ -909,8 +876,7 @@ private[spark] class BlockManager(
     val peersForReplication = new ArrayBuffer[BlockManagerId]
     val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
     val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
-    val tLevel = StorageLevel(
-      level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
+    val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
     val startTime = System.currentTimeMillis
     val random = new Random(blockId.hashCode)
 
@@ -1120,9 +1086,7 @@ private[spark] class BlockManager(
         // Removals are idempotent in disk store and memory store. At worst, we get a warning.
         val removedFromMemory = memoryStore.remove(blockId)
         val removedFromDisk = diskStore.remove(blockId)
-        val removedFromExternalBlockStore =
-          if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
-        if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
+        if (!removedFromMemory && !removedFromDisk) {
           logWarning(s"Block $blockId could not be removed as it was not found in either " +
             "the disk, memory, or external block store")
         }
@@ -1212,9 +1176,6 @@ private[spark] class BlockManager(
     blockInfo.clear()
     memoryStore.clear()
     diskStore.clear()
-    if (externalBlockStoreInitialized) {
-      externalBlockStore.clear()
-    }
     futureExecutionContext.shutdownNow()
     logInfo("BlockManager stopped")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index da1de11..0b7aa59 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -54,11 +54,9 @@ class BlockManagerMaster(
       blockId: BlockId,
       storageLevel: StorageLevel,
       memSize: Long,
-      diskSize: Long,
-      externalBlockStoreSize: Long): Boolean = {
+      diskSize: Long): Boolean = {
     val res = driverEndpoint.askWithRetry[Boolean](
-      UpdateBlockInfo(blockManagerId, blockId, storageLevel,
-        memSize, diskSize, externalBlockStoreSize))
+      UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
     logDebug(s"Updated info of block $blockId")
     res
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 4db400a..fbb3df8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -59,10 +59,9 @@ class BlockManagerMasterEndpoint(
       register(blockManagerId, maxMemSize, slaveEndpoint)
       context.reply(true)
 
-    case _updateBlockInfo @ UpdateBlockInfo(
-      blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
-      context.reply(updateBlockInfo(
-        blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
+    case _updateBlockInfo @
+        UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+      context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
       listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
 
     case GetLocations(blockId) =>
@@ -325,8 +324,7 @@ class BlockManagerMasterEndpoint(
       blockId: BlockId,
       storageLevel: StorageLevel,
       memSize: Long,
-      diskSize: Long,
-      externalBlockStoreSize: Long): Boolean = {
+      diskSize: Long): Boolean = {
 
     if (!blockManagerInfo.contains(blockManagerId)) {
       if (blockManagerId.isDriver && !isLocal) {
@@ -343,8 +341,7 @@ class BlockManagerMasterEndpoint(
       return true
     }
 
-    blockManagerInfo(blockManagerId).updateBlockInfo(
-      blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
+    blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
 
     var locations: mutable.HashSet[BlockManagerId] = null
     if (blockLocations.containsKey(blockId)) {
@@ -404,17 +401,13 @@ class BlockManagerMasterEndpoint(
 }
 
 @DeveloperApi
-case class BlockStatus(
-    storageLevel: StorageLevel,
-    memSize: Long,
-    diskSize: Long,
-    externalBlockStoreSize: Long) {
-  def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
+case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
+  def isCached: Boolean = memSize + diskSize > 0
 }
 
 @DeveloperApi
 object BlockStatus {
-  def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
+  def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
 }
 
 private[spark] class BlockManagerInfo(
@@ -443,8 +436,7 @@ private[spark] class BlockManagerInfo(
       blockId: BlockId,
       storageLevel: StorageLevel,
       memSize: Long,
-      diskSize: Long,
-      externalBlockStoreSize: Long) {
+      diskSize: Long) {
 
     updateLastSeenMs()
 
@@ -468,7 +460,7 @@ private[spark] class BlockManagerInfo(
        * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
       var blockStatus: BlockStatus = null
       if (storageLevel.useMemory) {
-        blockStatus = BlockStatus(storageLevel, memSize, 0, 0)
+        blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
         _blocks.put(blockId, blockStatus)
         _remainingMem -= memSize
         logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
@@ -476,17 +468,11 @@ private[spark] class BlockManagerInfo(
           Utils.bytesToString(_remainingMem)))
       }
       if (storageLevel.useDisk) {
-        blockStatus = BlockStatus(storageLevel, 0, diskSize, 0)
+        blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
         _blocks.put(blockId, blockStatus)
         logInfo("Added %s on disk on %s (size: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
       }
-      if (storageLevel.useOffHeap) {
-        blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)
-        _blocks.put(blockId, blockStatus)
-        logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
-          blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
-      }
       if (!blockId.isBroadcast && blockStatus.isCached) {
         _cachedBlocks += blockId
       }
@@ -504,11 +490,6 @@ private[spark] class BlockManagerInfo(
         logInfo("Removed %s on %s on disk (size: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
       }
-      if (blockStatus.storageLevel.useOffHeap) {
-        logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
-          blockId, blockManagerId.hostPort,
-          Utils.bytesToString(blockStatus.externalBlockStoreSize)))
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index f392a4a..6bded92 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -63,12 +63,11 @@ private[spark] object BlockManagerMessages {
       var blockId: BlockId,
       var storageLevel: StorageLevel,
       var memSize: Long,
-      var diskSize: Long,
-      var externalBlockStoreSize: Long)
+      var diskSize: Long)
     extends ToBlockManagerMaster
     with Externalizable {
 
-    def this() = this(null, null, null, 0, 0, 0)  // For deserialization only
+    def this() = this(null, null, null, 0, 0)  // For deserialization only
 
     override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
       blockManagerId.writeExternal(out)
@@ -76,7 +75,6 @@ private[spark] object BlockManagerMessages {
       storageLevel.writeExternal(out)
       out.writeLong(memSize)
       out.writeLong(diskSize)
-      out.writeLong(externalBlockStoreSize)
     }
 
     override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -85,7 +83,6 @@ private[spark] object BlockManagerMessages {
       storageLevel = StorageLevel(in)
       memSize = in.readLong()
       diskSize = in.readLong()
-      externalBlockStoreSize = in.readLong()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
index 2789e25..0a14fca 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
@@ -26,8 +26,7 @@ private[spark] case class BlockUIData(
     location: String,
     storageLevel: StorageLevel,
     memSize: Long,
-    diskSize: Long,
-    externalBlockStoreSize: Long)
+    diskSize: Long)
 
 /**
  * The aggregated status of stream blocks in an executor
@@ -41,8 +40,6 @@ private[spark] case class ExecutorStreamBlockStatus(
 
   def totalDiskSize: Long = blocks.map(_.diskSize).sum
 
-  def totalExternalBlockStoreSize: Long = blocks.map(_.externalBlockStoreSize).sum
-
   def numStreamBlocks: Int = blocks.size
 
 }
@@ -62,7 +59,6 @@ private[spark] class BlockStatusListener extends SparkListener {
     val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
     val memSize = blockUpdated.blockUpdatedInfo.memSize
     val diskSize = blockUpdated.blockUpdatedInfo.diskSize
-    val externalBlockStoreSize = blockUpdated.blockUpdatedInfo.externalBlockStoreSize
 
     synchronized {
       // Drop the update info if the block manager is not registered
@@ -74,8 +70,7 @@ private[spark] class BlockStatusListener extends SparkListener {
               blockManagerId.hostPort,
               storageLevel,
               memSize,
-              diskSize,
-              externalBlockStoreSize)
+              diskSize)
           )
         } else {
           // If isValid is not true, it means we should drop the block.

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
index a5790e4..e070bf6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
@@ -30,8 +30,7 @@ case class BlockUpdatedInfo(
     blockId: BlockId,
     storageLevel: StorageLevel,
     memSize: Long,
-    diskSize: Long,
-    externalBlockStoreSize: Long)
+    diskSize: Long)
 
 private[spark] object BlockUpdatedInfo {
 
@@ -41,7 +40,6 @@ private[spark] object BlockUpdatedInfo {
       updateBlockInfo.blockId,
       updateBlockInfo.storageLevel,
       updateBlockInfo.memSize,
-      updateBlockInfo.diskSize,
-      updateBlockInfo.externalBlockStoreSize)
+      updateBlockInfo.diskSize)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
deleted file mode 100644
index f39325a..0000000
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-/**
- * An abstract class that the concrete external block manager has to inherit.
- * The class has to have a no-argument constructor, and will be initialized by init,
- * which is invoked by ExternalBlockStore. The main input parameter is blockId for all
- * the methods, which is the unique identifier for Block in one Spark application.
- *
- * The underlying external block manager should avoid any name space conflicts  among multiple
- * Spark applications. For example, creating different directory for different applications
- * by randomUUID
- *
- */
-private[spark] abstract class ExternalBlockManager {
-
-  protected var blockManager: BlockManager = _
-
-  override def toString: String = {"External Block Store"}
-
-  /**
-   * Initialize a concrete block manager implementation. Subclass should initialize its internal
-   * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore
-   * right after the class is constructed. The function should throw IOException on failure
-   *
-   * @throws java.io.IOException if there is any file system failure during the initialization.
-   */
-  def init(blockManager: BlockManager, executorId: String): Unit = {
-    this.blockManager = blockManager
-  }
-
-  /**
-   * Drop the block from underlying external block store, if it exists..
-   * @return true on successfully removing the block
-   *         false if the block could not be removed as it was not found
-   *
-   * @throws java.io.IOException if there is any file system failure in removing the block.
-   */
-  def removeBlock(blockId: BlockId): Boolean
-
-  /**
-   * Used by BlockManager to check the existence of the block in the underlying external
-   * block store.
-   * @return true if the block exists.
-   *         false if the block does not exists.
-   *
-   * @throws java.io.IOException if there is any file system failure in checking
-   *                             the block existence.
-   */
-  def blockExists(blockId: BlockId): Boolean
-
-  /**
-   * Put the given block to the underlying external block store. Note that in normal case,
-   * putting a block should never fail unless something wrong happens to the underlying
-   * external block store, e.g., file system failure, etc. In this case, IOException
-   * should be thrown.
-   *
-   * @throws java.io.IOException if there is any file system failure in putting the block.
-   */
-  def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
-
-  def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
-    val bytes = blockManager.dataSerialize(blockId, values)
-    putBytes(blockId, bytes)
-  }
-
-  /**
-   * Retrieve the block bytes.
-   * @return Some(ByteBuffer) if the block bytes is successfully retrieved
-   *         None if the block does not exist in the external block store.
-   *
-   * @throws java.io.IOException if there is any file system failure in getting the block.
-   */
-  def getBytes(blockId: BlockId): Option[ByteBuffer]
-
-  /**
-   * Retrieve the block data.
-   * @return Some(Iterator[Any]) if the block data is successfully retrieved
-   *         None if the block does not exist in the external block store.
-   *
-   * @throws java.io.IOException if there is any file system failure in getting the block.
-   */
-  def getValues(blockId: BlockId): Option[Iterator[_]] = {
-    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
-  }
-
-  /**
-   * Get the size of the block saved in the underlying external block store,
-   * which is saved before by putBytes.
-   * @return size of the block
-   *         0 if the block does not exist
-   *
-   * @throws java.io.IOException if there is any file system failure in getting the block size.
-   */
-  def getSize(blockId: BlockId): Long
-
-  /**
-   * Clean up any information persisted in the underlying external block store,
-   * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore
-   * during system shutdown.
-   *
-   */
-  def shutdown()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
deleted file mode 100644
index 94883a5..0000000
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import scala.util.control.NonFatal
-
-import org.apache.spark.Logging
-import org.apache.spark.util.{ShutdownHookManager, Utils}
-
-
-/**
- * Stores BlockManager blocks on ExternalBlockStore.
- * We capture any potential exception from underlying implementation
- * and return with the expected failure value
- */
-private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: String)
-  extends BlockStore(blockManager: BlockManager) with Logging {
-
-  lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager()
-
-  logInfo("ExternalBlockStore started")
-
-  override def getSize(blockId: BlockId): Long = {
-    try {
-      externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
-    } catch {
-      case NonFatal(t) =>
-        logError(s"Error in getSize($blockId)", t)
-        0L
-    }
-  }
-
-  override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
-    putIntoExternalBlockStore(blockId, bytes, returnValues = true)
-  }
-
-  override def putArray(
-      blockId: BlockId,
-      values: Array[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-    putIntoExternalBlockStore(blockId, values.toIterator, returnValues)
-  }
-
-  override def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-    putIntoExternalBlockStore(blockId, values, returnValues)
-  }
-
-  private def putIntoExternalBlockStore(
-      blockId: BlockId,
-      values: Iterator[_],
-      returnValues: Boolean): PutResult = {
-    logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
-    // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
-    try {
-      val startTime = System.currentTimeMillis
-      if (externalBlockManager.isDefined) {
-        externalBlockManager.get.putValues(blockId, values)
-        val size = getSize(blockId)
-        val data = if (returnValues) {
-          Left(getValues(blockId).get)
-        } else {
-          null
-        }
-        val finishTime = System.currentTimeMillis
-        logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
-          blockId, Utils.bytesToString(size), finishTime - startTime))
-        PutResult(size, data)
-      } else {
-        logError(s"Error in putValues($blockId): no ExternalBlockManager has been configured")
-        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
-      }
-    } catch {
-      case NonFatal(t) =>
-        logError(s"Error in putValues($blockId)", t)
-        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
-    }
-  }
-
-  private def putIntoExternalBlockStore(
-      blockId: BlockId,
-      bytes: ByteBuffer,
-      returnValues: Boolean): PutResult = {
-    logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
-    // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
-    try {
-      val startTime = System.currentTimeMillis
-      if (externalBlockManager.isDefined) {
-        val byteBuffer = bytes.duplicate()
-        byteBuffer.rewind()
-        externalBlockManager.get.putBytes(blockId, byteBuffer)
-        val size = bytes.limit()
-        val data = if (returnValues) {
-          Right(bytes)
-        } else {
-          null
-        }
-        val finishTime = System.currentTimeMillis
-        logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
-          blockId, Utils.bytesToString(size), finishTime - startTime))
-        PutResult(size, data)
-      } else {
-        logError(s"Error in putBytes($blockId): no ExternalBlockManager has been configured")
-        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
-      }
-    } catch {
-      case NonFatal(t) =>
-        logError(s"Error in putBytes($blockId)", t)
-        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
-    }
-  }
-
-  // We assume the block is removed even if exception thrown
-  override def remove(blockId: BlockId): Boolean = {
-    try {
-      externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
-    } catch {
-      case NonFatal(t) =>
-        logError(s"Error in removeBlock($blockId)", t)
-        true
-    }
-  }
-
-  override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    try {
-      externalBlockManager.flatMap(_.getValues(blockId))
-    } catch {
-      case NonFatal(t) =>
-        logError(s"Error in getValues($blockId)", t)
-        None
-    }
-  }
-
-  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    try {
-      externalBlockManager.flatMap(_.getBytes(blockId))
-    } catch {
-      case NonFatal(t) =>
-        logError(s"Error in getBytes($blockId)", t)
-        None
-    }
-  }
-
-  override def contains(blockId: BlockId): Boolean = {
-    try {
-      val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
-      if (!ret) {
-        logInfo(s"Remove block $blockId")
-        blockManager.removeBlock(blockId, true)
-      }
-      ret
-    } catch {
-      case NonFatal(t) =>
-        logError(s"Error in getBytes($blockId)", t)
-        false
-    }
-  }
-
-  // Create concrete block manager and fall back to Tachyon by default for backward compatibility.
-  private def createBlkManager(): Option[ExternalBlockManager] = {
-    val clsName = blockManager.conf.getOption(ExternalBlockStore.BLOCK_MANAGER_NAME)
-      .getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
-
-    try {
-      val instance = Utils.classForName(clsName)
-        .newInstance()
-        .asInstanceOf[ExternalBlockManager]
-      instance.init(blockManager, executorId)
-      ShutdownHookManager.addShutdownHook { () =>
-        logDebug("Shutdown hook called")
-        externalBlockManager.map(_.shutdown())
-      }
-      Some(instance)
-    } catch {
-      case NonFatal(t) =>
-        logError("Cannot initialize external block store", t)
-        None
-    }
-  }
-}
-
-private[spark] object ExternalBlockStore extends Logging {
-  val MAX_DIR_CREATION_ATTEMPTS = 10
-  val SUB_DIRS_PER_DIR = "64"
-  val BASE_DIR = "spark.externalBlockStore.baseDir"
-  val FOLD_NAME = "spark.externalBlockStore.folderName"
-  val MASTER_URL = "spark.externalBlockStore.url"
-  val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager"
-  val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.TachyonBlockManager"
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 673f7ad..083d78b 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{RDD, RDDOperationScope}
-import org.apache.spark.util.{CallSite, Utils}
+import org.apache.spark.util.Utils
 
 @DeveloperApi
 class RDDInfo(
@@ -37,15 +37,14 @@ class RDDInfo(
   var diskSize = 0L
   var externalBlockStoreSize = 0L
 
-  def isCached: Boolean =
-    (memSize + diskSize + externalBlockStoreSize > 0) && numCachedPartitions > 0
+  def isCached: Boolean = (memSize + diskSize > 0) && numCachedPartitions > 0
 
   override def toString: String = {
     import Utils.bytesToString
     ("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
-      "MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s").format(
+      "MemorySize: %s; DiskSize: %s").format(
         name, id, storageLevel.toString, numCachedPartitions, numPartitions,
-        bytesToString(memSize), bytesToString(externalBlockStoreSize), bytesToString(diskSize))
+        bytesToString(memSize), bytesToString(diskSize))
   }
 
   override def compare(that: RDDInfo): Int = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 703bce3..38e9534 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -150,7 +150,9 @@ object StorageLevel {
   val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
   val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
   val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
-  val OFF_HEAP = new StorageLevel(false, false, true, false)
+
+  // Redirect to MEMORY_ONLY_SER for now.
+  val OFF_HEAP = MEMORY_ONLY_SER
 
   /**
    * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index c4ac300..8e2cfb2 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -48,14 +48,14 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
    * non-RDD blocks for the same reason. In particular, RDD storage information is stored
    * in a map indexed by the RDD ID to the following 4-tuple:
    *
-   *   (memory size, disk size, off-heap size, storage level)
+   *   (memory size, disk size, storage level)
    *
    * We assume that all the blocks that belong to the same RDD have the same storage level.
    * This field is not relevant to non-RDD blocks, however, so the storage information for
    * non-RDD blocks contains only the first 3 fields (in the same order).
    */
-  private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, Long, StorageLevel)]
-  private var _nonRddStorageInfo: (Long, Long, Long) = (0L, 0L, 0L)
+  private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, StorageLevel)]
+  private var _nonRddStorageInfo: (Long, Long) = (0L, 0L)
 
   /** Create a storage status with an initial set of blocks, leaving the source unmodified. */
   def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) {
@@ -177,20 +177,14 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
   /** Return the disk space used by this block manager. */
   def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
 
-  /** Return the off-heap space used by this block manager. */
-  def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
-
   /** Return the memory used by the given RDD in this block manager in O(1) time. */
   def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
 
   /** Return the disk space used by the given RDD in this block manager in O(1) time. */
   def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)
 
-  /** Return the off-heap space used by the given RDD in this block manager in O(1) time. */
-  def offHeapUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._3).getOrElse(0L)
-
   /** Return the storage level, if any, used by the given RDD in this block manager. */
-  def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._4)
+  def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._3)
 
   /**
    * Update the relevant storage info, taking into account any existing status for this block.
@@ -199,34 +193,31 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
     val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty)
     val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize
     val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize
-    val changeInExternalBlockStore =
-      newBlockStatus.externalBlockStoreSize - oldBlockStatus.externalBlockStoreSize
     val level = newBlockStatus.storageLevel
 
     // Compute new info from old info
-    val (oldMem, oldDisk, oldExternalBlockStore) = blockId match {
+    val (oldMem, oldDisk) = blockId match {
       case RDDBlockId(rddId, _) =>
         _rddStorageInfo.get(rddId)
-          .map { case (mem, disk, externalBlockStore, _) => (mem, disk, externalBlockStore) }
-          .getOrElse((0L, 0L, 0L))
+          .map { case (mem, disk, _) => (mem, disk) }
+          .getOrElse((0L, 0L))
       case _ =>
         _nonRddStorageInfo
     }
     val newMem = math.max(oldMem + changeInMem, 0L)
     val newDisk = math.max(oldDisk + changeInDisk, 0L)
-    val newExternalBlockStore = math.max(oldExternalBlockStore + changeInExternalBlockStore, 0L)
 
     // Set the correct info
     blockId match {
       case RDDBlockId(rddId, _) =>
         // If this RDD is no longer persisted, remove it
-        if (newMem + newDisk + newExternalBlockStore == 0) {
+        if (newMem + newDisk == 0) {
           _rddStorageInfo.remove(rddId)
         } else {
-          _rddStorageInfo(rddId) = (newMem, newDisk, newExternalBlockStore, level)
+          _rddStorageInfo(rddId) = (newMem, newDisk, level)
         }
       case _ =>
-        _nonRddStorageInfo = (newMem, newDisk, newExternalBlockStore)
+        _nonRddStorageInfo = (newMem, newDisk)
     }
   }
 
@@ -248,13 +239,11 @@ private[spark] object StorageUtils {
       val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
       val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
       val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
-      val externalBlockStoreSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
 
       rddInfo.storageLevel = storageLevel
       rddInfo.numCachedPartitions = numCachedPartitions
       rddInfo.memSize = memSize
       rddInfo.diskSize = diskSize
-      rddInfo.externalBlockStoreSize = externalBlockStoreSize
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
deleted file mode 100644
index 6aa7e13..0000000
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.io.IOException
-import java.nio.ByteBuffer
-import java.text.SimpleDateFormat
-import java.util.{Date, Random}
-
-import scala.util.control.NonFatal
-
-import com.google.common.io.ByteStreams
-import tachyon.{Constants, TachyonURI}
-import tachyon.client.ClientContext
-import tachyon.client.file.{TachyonFile, TachyonFileSystem}
-import tachyon.client.file.TachyonFileSystem.TachyonFileSystemFactory
-import tachyon.client.file.options.DeleteOptions
-import tachyon.conf.TachyonConf
-import tachyon.exception.{FileAlreadyExistsException, FileDoesNotExistException}
-
-import org.apache.spark.Logging
-import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
-
-/**
- * Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By
- * default, one block is mapped to one file with a name given by its BlockId.
- *
- */
-private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
-
-  var rootDirs: String = _
-  var master: String = _
-  var client: TachyonFileSystem = _
-  private var subDirsPerTachyonDir: Int = _
-
-  // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
-  // then, inside this directory, create multiple subdirectories that we will hash files into,
-  // in order to avoid having really large inodes at the top level in Tachyon.
-  private var tachyonDirs: Array[TachyonFile] = _
-  private var subDirs: Array[Array[TachyonFile]] = _
-  private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
-
-  override def init(blockManager: BlockManager, executorId: String): Unit = {
-    super.init(blockManager, executorId)
-    val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
-    val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
-
-    rootDirs = s"$storeDir/$appFolderName/$executorId"
-    master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998")
-    client = if (master != null && master != "") {
-      val tachyonConf = new TachyonConf()
-      tachyonConf.set(Constants.MASTER_ADDRESS, master)
-      ClientContext.reset(tachyonConf)
-      TachyonFileSystemFactory.get
-    } else {
-      null
-    }
-    // original implementation call System.exit, we change it to run without extblkstore support
-    if (client == null) {
-      logError("Failed to connect to the Tachyon as the master address is not configured")
-      throw new IOException("Failed to connect to the Tachyon as the master " +
-        "address is not configured")
-    }
-    subDirsPerTachyonDir = blockManager.conf.get("spark.externalBlockStore.subDirectories",
-      ExternalBlockStore.SUB_DIRS_PER_DIR).toInt
-
-    // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
-    // then, inside this directory, create multiple subdirectories that we will hash files into,
-    // in order to avoid having really large inodes at the top level in Tachyon.
-    tachyonDirs = createTachyonDirs()
-    subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
-    tachyonDirs.foreach(registerShutdownDeleteDir)
-  }
-
-  override def toString: String = {"ExternalBlockStore-Tachyon"}
-
-  override def removeBlock(blockId: BlockId): Boolean = {
-    val file = getFile(blockId)
-    if (fileExists(file)) {
-      removeFile(file)
-      true
-    } else {
-      false
-    }
-  }
-
-  override def blockExists(blockId: BlockId): Boolean = {
-    val file = getFile(blockId)
-    fileExists(file)
-  }
-
-  override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
-    val file = getFile(blockId)
-    val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
-    try {
-      Utils.writeByteBuffer(bytes, os)
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Failed to put bytes of block $blockId into Tachyon", e)
-        os.cancel()
-    } finally {
-      os.close()
-    }
-  }
-
-  override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
-    val file = getFile(blockId)
-    val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
-    try {
-      blockManager.dataSerializeStream(blockId, os, values)
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Failed to put values of block $blockId into Tachyon", e)
-        os.cancel()
-    } finally {
-      os.close()
-    }
-  }
-
-  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    val file = getFile(blockId)
-    if (file == null) {
-      return None
-    }
-    val is = try {
-      client.getInStream(file)
-    } catch {
-      case _: FileDoesNotExistException =>
-        return None
-    }
-    try {
-      val size = client.getInfo(file).length
-      val bs = new Array[Byte](size.asInstanceOf[Int])
-      ByteStreams.readFully(is, bs)
-      Some(ByteBuffer.wrap(bs))
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Failed to get bytes of block $blockId from Tachyon", e)
-        None
-    } finally {
-      is.close()
-    }
-  }
-
-  override def getValues(blockId: BlockId): Option[Iterator[_]] = {
-    val file = getFile(blockId)
-    if (file == null) {
-      return None
-    }
-    val is = try {
-      client.getInStream(file)
-    } catch {
-      case _: FileDoesNotExistException =>
-        return None
-    }
-    try {
-      Some(blockManager.dataDeserializeStream(blockId, is))
-    } finally {
-      is.close()
-    }
-  }
-
-  override def getSize(blockId: BlockId): Long = {
-    client.getInfo(getFile(blockId.name)).length
-  }
-
-  def removeFile(file: TachyonFile): Unit = {
-    client.delete(file)
-  }
-
-  def fileExists(file: TachyonFile): Boolean = {
-    try {
-      client.getInfo(file)
-      true
-    } catch {
-      case _: FileDoesNotExistException => false
-    }
-  }
-
-  def getFile(filename: String): TachyonFile = {
-    // Figure out which tachyon directory it hashes to, and which subdirectory in that
-    val hash = Utils.nonNegativeHash(filename)
-    val dirId = hash % tachyonDirs.length
-    val subDirId = (hash / tachyonDirs.length) % subDirsPerTachyonDir
-
-    // Create the subdirectory if it doesn't already exist
-    var subDir = subDirs(dirId)(subDirId)
-    if (subDir == null) {
-      subDir = subDirs(dirId).synchronized {
-        val old = subDirs(dirId)(subDirId)
-        if (old != null) {
-          old
-        } else {
-          val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
-          client.mkdir(path)
-          val newDir = client.loadMetadata(path)
-          subDirs(dirId)(subDirId) = newDir
-          newDir
-        }
-      }
-    }
-    val filePath = new TachyonURI(s"$subDir/$filename")
-    try {
-      client.create(filePath)
-    } catch {
-      case _: FileAlreadyExistsException => client.loadMetadata(filePath)
-    }
-  }
-
-  def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name)
-
-  // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
-  private def createTachyonDirs(): Array[TachyonFile] = {
-    logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'")
-    val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
-    rootDirs.split(",").map { rootDir =>
-      var foundLocalDir = false
-      var tachyonDir: TachyonFile = null
-      var tachyonDirId: String = null
-      var tries = 0
-      val rand = new Random()
-      while (!foundLocalDir && tries < ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS) {
-        tries += 1
-        try {
-          tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
-          val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
-          try {
-            foundLocalDir = client.mkdir(path)
-            tachyonDir = client.loadMetadata(path)
-          } catch {
-            case _: FileAlreadyExistsException => // continue
-          }
-        } catch {
-          case NonFatal(e) =>
-            logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
-        }
-      }
-      if (!foundLocalDir) {
-        logError("Failed " + ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS
-          + " attempts to create tachyon dir in " + rootDir)
-        System.exit(ExecutorExitCode.EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR)
-      }
-      logInfo("Created tachyon directory at " + tachyonDir)
-      tachyonDir
-    }
-  }
-
-  override def shutdown() {
-    logDebug("Shutdown hook called")
-    tachyonDirs.foreach { tachyonDir =>
-      try {
-        if (!hasRootAsShutdownDeleteDir(tachyonDir)) {
-          deleteRecursively(tachyonDir, client)
-        }
-      } catch {
-        case NonFatal(e) =>
-          logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
-      }
-    }
-  }
-
-  /**
-    * Delete a file or directory and its contents recursively.
-    */
-  private def deleteRecursively(dir: TachyonFile, client: TachyonFileSystem) {
-    client.delete(dir, new DeleteOptions.Builder(ClientContext.getConf).setRecursive(true).build())
-  }
-
-  // Register the tachyon path to be deleted via shutdown hook
-  private def registerShutdownDeleteDir(file: TachyonFile) {
-    val absolutePath = client.getInfo(file).getPath
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths += absolutePath
-    }
-  }
-
-  // Remove the tachyon path to be deleted via shutdown hook
-  private def removeShutdownDeleteDir(file: TachyonFile) {
-    val absolutePath = client.getInfo(file).getPath
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths -= absolutePath
-    }
-  }
-
-  // Is the path already registered to be deleted via a shutdown hook ?
-  private def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
-    val absolutePath = client.getInfo(file).getPath
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.contains(absolutePath)
-    }
-  }
-
-  // Note: if file is child of some registered path, while not equal to it, then return true;
-  // else false. This is to ensure that two shutdown hooks do not try to delete each others
-  // paths - resulting in Exception and incomplete cleanup.
-  private def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
-    val absolutePath = client.getInfo(file).getPath
-    val hasRoot = shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.exists(
-        path => !absolutePath.equals(path) && absolutePath.startsWith(path))
-    }
-    if (hasRoot) {
-      logInfo(s"path = $absolutePath, already present as root for deletion.")
-    }
-    hasRoot
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 04f5846..c9bb49b 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -54,7 +54,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
     "Cached Partitions",
     "Fraction Cached",
     "Size in Memory",
-    "Size in ExternalBlockStore",
     "Size on Disk")
 
   /** Render an HTML row representing an RDD */
@@ -71,7 +70,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
       <td>{rdd.numCachedPartitions.toString}</td>
       <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
       <td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
-      <td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td>
       <td sorttable_customkey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td>
     </tr>
     // scalastyle:on
@@ -104,7 +102,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
     "Executor ID",
     "Address",
     "Total Size in Memory",
-    "Total Size in ExternalBlockStore",
     "Total Size on Disk",
     "Stream Blocks")
 
@@ -119,9 +116,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
       <td sorttable_customkey={status.totalMemSize.toString}>
         {Utils.bytesToString(status.totalMemSize)}
       </td>
-      <td sorttable_customkey={status.totalExternalBlockStoreSize.toString}>
-        {Utils.bytesToString(status.totalExternalBlockStoreSize)}
-      </td>
       <td sorttable_customkey={status.totalDiskSize.toString}>
         {Utils.bytesToString(status.totalDiskSize)}
       </td>
@@ -195,8 +189,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
       ("Memory", block.memSize)
     } else if (block.storageLevel.useMemory && !block.storageLevel.deserialized) {
       ("Memory Serialized", block.memSize)
-    } else if (block.storageLevel.useOffHeap) {
-      ("External", block.externalBlockStoreSize)
     } else {
       throw new IllegalStateException(s"Invalid Storage Level: ${block.storageLevel}")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a62fd2f..a6460bc 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -409,14 +409,12 @@ private[spark] object JsonProtocol {
     ("Number of Partitions" -> rddInfo.numPartitions) ~
     ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
     ("Memory Size" -> rddInfo.memSize) ~
-    ("ExternalBlockStore Size" -> rddInfo.externalBlockStoreSize) ~
     ("Disk Size" -> rddInfo.diskSize)
   }
 
   def storageLevelToJson(storageLevel: StorageLevel): JValue = {
     ("Use Disk" -> storageLevel.useDisk) ~
     ("Use Memory" -> storageLevel.useMemory) ~
-    ("Use ExternalBlockStore" -> storageLevel.useOffHeap) ~
     ("Deserialized" -> storageLevel.deserialized) ~
     ("Replication" -> storageLevel.replication)
   }
@@ -425,7 +423,6 @@ private[spark] object JsonProtocol {
     val storageLevel = storageLevelToJson(blockStatus.storageLevel)
     ("Storage Level" -> storageLevel) ~
     ("Memory Size" -> blockStatus.memSize) ~
-    ("ExternalBlockStore Size" -> blockStatus.externalBlockStoreSize) ~
     ("Disk Size" -> blockStatus.diskSize)
   }
 
@@ -867,15 +864,11 @@ private[spark] object JsonProtocol {
     val numPartitions = (json \ "Number of Partitions").extract[Int]
     val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
     val memSize = (json \ "Memory Size").extract[Long]
-    // fallback to tachyon for backward compatibility
-    val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
-      .getOrElse(json \ "Tachyon Size").extract[Long]
     val diskSize = (json \ "Disk Size").extract[Long]
 
     val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope)
     rddInfo.numCachedPartitions = numCachedPartitions
     rddInfo.memSize = memSize
-    rddInfo.externalBlockStoreSize = externalBlockStoreSize
     rddInfo.diskSize = diskSize
     rddInfo
   }
@@ -883,22 +876,16 @@ private[spark] object JsonProtocol {
   def storageLevelFromJson(json: JValue): StorageLevel = {
     val useDisk = (json \ "Use Disk").extract[Boolean]
     val useMemory = (json \ "Use Memory").extract[Boolean]
-    // fallback to tachyon for backward compatability
-    val useExternalBlockStore = (json \ "Use ExternalBlockStore").toSome
-      .getOrElse(json \ "Use Tachyon").extract[Boolean]
     val deserialized = (json \ "Deserialized").extract[Boolean]
     val replication = (json \ "Replication").extract[Int]
-    StorageLevel(useDisk, useMemory, useExternalBlockStore, deserialized, replication)
+    StorageLevel(useDisk, useMemory, deserialized, replication)
   }
 
   def blockStatusFromJson(json: JValue): BlockStatus = {
     val storageLevel = storageLevelFromJson(json \ "Storage Level")
     val memorySize = (json \ "Memory Size").extract[Long]
     val diskSize = (json \ "Disk Size").extract[Long]
-    // fallback to tachyon for backward compatability
-    val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
-      .getOrElse(json \ "Tachyon Size").extract[Long]
-    BlockStatus(storageLevel, memorySize, diskSize, externalBlockStoreSize)
+    BlockStatus(storageLevel, memorySize, diskSize)
   }
 
   def executorInfoFromJson(json: JValue): ExecutorInfo = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index f2924a6..3b23687 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -102,14 +102,14 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
           // We can evict enough blocks to fulfill the request for space
           mm.releaseStorageMemory(numBytesToFree)
           args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
-            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L)))
+            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
           // We need to add this call so that that the suite-level `evictedBlocks` is updated when
           // execution evicts storage; in that case, args.last will not be equal to evictedBlocks
           // because it will be a temporary buffer created inside of the MemoryManager rather than
           // being passed in by the test code.
           if (!(evictedBlocks eq args.last)) {
             evictedBlocks.append(
-              (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L)))
+              (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
           }
           true
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
index e694f5e..2802cd9 100644
--- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.rdd
 
-import org.mockito.Mockito.spy
-
 import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite}
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 
@@ -46,10 +44,6 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {
     assert(transform(StorageLevel.MEMORY_AND_DISK_SER) === StorageLevel.MEMORY_AND_DISK_SER)
     assert(transform(StorageLevel.MEMORY_AND_DISK_2) === StorageLevel.MEMORY_AND_DISK_2)
     assert(transform(StorageLevel.MEMORY_AND_DISK_SER_2) === StorageLevel.MEMORY_AND_DISK_SER_2)
-    // Off-heap is not supported and Spark should fail fast
-    intercept[SparkException] {
-      transform(StorageLevel.OFF_HEAP)
-    }
   }
 
   test("basic lineage truncation") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 62e6c4f..0f31561 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -121,11 +121,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
   }
 
   test("StorageLevel object caching") {
-    val level1 = StorageLevel(false, false, false, false, 3)
+    val level1 = StorageLevel(false, false, false, 3)
     // this should return the same object as level1
-    val level2 = StorageLevel(false, false, false, false, 3)
+    val level2 = StorageLevel(false, false, false, 3)
     // this should return a different object
-    val level3 = StorageLevel(false, false, false, false, 2)
+    val level3 = StorageLevel(false, false, false, 2)
     assert(level2 === level1, "level2 is not same as level1")
     assert(level2.eq(level1), "level2 is not the same object as level1")
     assert(level3 != level1, "level3 is same as level1")
@@ -562,26 +562,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store")
   }
 
-  test("tachyon storage") {
-    // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar.
-    val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
-    conf.set(ExternalBlockStore.BLOCK_MANAGER_NAME, ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
-    if (tachyonUnitTestEnabled) {
-      store = makeBlockManager(1200)
-      val a1 = new Array[Byte](400)
-      val a2 = new Array[Byte](400)
-      val a3 = new Array[Byte](400)
-      store.putSingle("a1", a1, StorageLevel.OFF_HEAP)
-      store.putSingle("a2", a2, StorageLevel.OFF_HEAP)
-      store.putSingle("a3", a3, StorageLevel.OFF_HEAP)
-      assert(store.getSingle("a3").isDefined, "a3 was in store")
-      assert(store.getSingle("a2").isDefined, "a2 was in store")
-      assert(store.getSingle("a1").isDefined, "a1 was in store")
-    } else {
-      info("tachyon storage test disabled.")
-    }
-  }
-
   test("on-disk storage") {
     store = makeBlockManager(1200)
     val a1 = new Array[Byte](400)

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
index d7ffde1..06acca3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
@@ -34,16 +34,14 @@ class BlockStatusListenerSuite extends SparkFunSuite {
         StreamBlockId(0, 100),
         StorageLevel.MEMORY_AND_DISK,
         memSize = 100,
-        diskSize = 100,
-        externalBlockStoreSize = 0)))
+        diskSize = 100)))
     // The new block status should be added to the listener
     val expectedBlock = BlockUIData(
       StreamBlockId(0, 100),
       "localhost:10000",
       StorageLevel.MEMORY_AND_DISK,
       memSize = 100,
-      diskSize = 100,
-      externalBlockStoreSize = 0
+      diskSize = 100
     )
     val expectedExecutorStreamBlockStatus = Seq(
       ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock))
@@ -60,15 +58,13 @@ class BlockStatusListenerSuite extends SparkFunSuite {
         StreamBlockId(0, 100),
         StorageLevel.MEMORY_AND_DISK,
         memSize = 100,
-        diskSize = 100,
-        externalBlockStoreSize = 0)))
+        diskSize = 100)))
     val expectedBlock2 = BlockUIData(
       StreamBlockId(0, 100),
       "localhost:10001",
       StorageLevel.MEMORY_AND_DISK,
       memSize = 100,
-      diskSize = 100,
-      externalBlockStoreSize = 0
+      diskSize = 100
     )
     // Each block manager should contain one block
     val expectedExecutorStreamBlockStatus2 = Set(
@@ -84,8 +80,7 @@ class BlockStatusListenerSuite extends SparkFunSuite {
         StreamBlockId(0, 100),
         StorageLevel.NONE, // StorageLevel.NONE means removing it
         memSize = 0,
-        diskSize = 0,
-        externalBlockStoreSize = 0)))
+        diskSize = 0)))
     // Only the first block manager contains a block
     val expectedExecutorStreamBlockStatus3 = Set(
       ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)),
@@ -102,8 +97,7 @@ class BlockStatusListenerSuite extends SparkFunSuite {
         StreamBlockId(0, 100),
         StorageLevel.MEMORY_AND_DISK,
         memSize = 100,
-        diskSize = 100,
-        externalBlockStoreSize = 0)))
+        diskSize = 100)))
     // The second block manager is removed so we should not see the new block
     val expectedExecutorStreamBlockStatus4 = Seq(
       ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock))

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 1a199be..355d80d 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -82,9 +82,9 @@ class StorageStatusListenerSuite extends SparkFunSuite {
     listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
     val taskMetrics1 = new TaskMetrics
     val taskMetrics2 = new TaskMetrics
-    val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
-    val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
-    val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
+    val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
+    val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
+    val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
     taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
     taskMetrics2.updatedBlocks = Some(Seq(block3))
 
@@ -105,9 +105,9 @@ class StorageStatusListenerSuite extends SparkFunSuite {
     assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
 
     // Task end with dropped blocks
-    val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
-    val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
-    val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
+    val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L))
+    val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L))
+    val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L))
     taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
     taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
 
@@ -130,9 +130,9 @@ class StorageStatusListenerSuite extends SparkFunSuite {
     listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
     val taskMetrics1 = new TaskMetrics
     val taskMetrics2 = new TaskMetrics
-    val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
-    val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
-    val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
+    val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
+    val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
+    val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
     taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
     taskMetrics2.updatedBlocks = Some(Seq(block3))
     listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))

http://git-wip-us.apache.org/repos/asf/spark/blob/ad1503f9/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index 1d5a813..e5733ae 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -33,10 +33,9 @@ class StorageSuite extends SparkFunSuite {
     assert(status.memUsed === 0L)
     assert(status.memRemaining === 1000L)
     assert(status.diskUsed === 0L)
-    assert(status.offHeapUsed === 0L)
-    status.addBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 10L, 20L, 1L))
-    status.addBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 10L, 20L, 1L))
-    status.addBlock(TestBlockId("faa"), BlockStatus(memAndDisk, 10L, 20L, 1L))
+    status.addBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 10L, 20L))
+    status.addBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 10L, 20L))
+    status.addBlock(TestBlockId("faa"), BlockStatus(memAndDisk, 10L, 20L))
     status
   }
 
@@ -50,18 +49,16 @@ class StorageSuite extends SparkFunSuite {
     assert(status.memUsed === 30L)
     assert(status.memRemaining === 970L)
     assert(status.diskUsed === 60L)
-    assert(status.offHeapUsed === 3L)
   }
 
   test("storage status update non-RDD blocks") {
     val status = storageStatus1
-    status.updateBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 50L, 100L, 1L))
-    status.updateBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 100L, 20L, 0L))
+    status.updateBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 50L, 100L))
+    status.updateBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 100L, 20L))
     assert(status.blocks.size === 3)
     assert(status.memUsed === 160L)
     assert(status.memRemaining === 840L)
     assert(status.diskUsed === 140L)
-    assert(status.offHeapUsed === 2L)
   }
 
   test("storage status remove non-RDD blocks") {
@@ -73,20 +70,19 @@ class StorageSuite extends SparkFunSuite {
     assert(status.memUsed === 10L)
     assert(status.memRemaining === 990L)
     assert(status.diskUsed === 20L)
-    assert(status.offHeapUsed === 1L)
   }
 
   // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
   private def storageStatus2: StorageStatus = {
     val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
     assert(status.rddBlocks.isEmpty)
-    status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L, 0L))
-    status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L, 0L))
-    status.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 10L, 20L, 1L))
-    status.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 100L, 200L, 1L))
-    status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L, 1L))
-    status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L, 0L))
-    status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L, 0L))
+    status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L))
+    status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L))
+    status.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 10L, 20L))
+    status.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 100L, 200L))
+    status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L))
+    status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L))
+    status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L))
     status
   }
 
@@ -113,9 +109,6 @@ class StorageSuite extends SparkFunSuite {
     assert(status.diskUsedByRdd(0) === 20L)
     assert(status.diskUsedByRdd(1) === 200L)
     assert(status.diskUsedByRdd(2) === 80L)
-    assert(status.offHeapUsedByRdd(0) === 1L)
-    assert(status.offHeapUsedByRdd(1) === 1L)
-    assert(status.offHeapUsedByRdd(2) === 1L)
     assert(status.rddStorageLevel(0) === Some(memAndDisk))
     assert(status.rddStorageLevel(1) === Some(memAndDisk))
     assert(status.rddStorageLevel(2) === Some(memAndDisk))
@@ -124,15 +117,14 @@ class StorageSuite extends SparkFunSuite {
     assert(status.rddBlocksById(10).isEmpty)
     assert(status.memUsedByRdd(10) === 0L)
     assert(status.diskUsedByRdd(10) === 0L)
-    assert(status.offHeapUsedByRdd(10) === 0L)
     assert(status.rddStorageLevel(10) === None)
   }
 
   test("storage status update RDD blocks") {
     val status = storageStatus2
-    status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 5000L, 0L, 0L))
-    status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L))
-    status.updateBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 0L, 1000L, 0L))
+    status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 5000L, 0L))
+    status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L))
+    status.updateBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 0L, 1000L))
     assert(status.blocks.size === 7)
     assert(status.rddBlocks.size === 5)
     assert(status.rddBlocksById(0).size === 1)
@@ -144,9 +136,6 @@ class StorageSuite extends SparkFunSuite {
     assert(status.diskUsedByRdd(0) === 0L)
     assert(status.diskUsedByRdd(1) === 200L)
     assert(status.diskUsedByRdd(2) === 1060L)
-    assert(status.offHeapUsedByRdd(0) === 0L)
-    assert(status.offHeapUsedByRdd(1) === 1L)
-    assert(status.offHeapUsedByRdd(2) === 0L)
   }
 
   test("storage status remove RDD blocks") {
@@ -170,9 +159,6 @@ class StorageSuite extends SparkFunSuite {
     assert(status.diskUsedByRdd(0) === 20L)
     assert(status.diskUsedByRdd(1) === 0L)
     assert(status.diskUsedByRdd(2) === 20L)
-    assert(status.offHeapUsedByRdd(0) === 1L)
-    assert(status.offHeapUsedByRdd(1) === 0L)
-    assert(status.offHeapUsedByRdd(2) === 0L)
   }
 
   test("storage status containsBlock") {
@@ -209,17 +195,17 @@ class StorageSuite extends SparkFunSuite {
     val status = storageStatus2
     assert(status.blocks.size === status.numBlocks)
     assert(status.rddBlocks.size === status.numRddBlocks)
-    status.addBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 0L, 100L))
-    status.addBlock(RDDBlockId(4, 4), BlockStatus(memAndDisk, 0L, 0L, 100L))
-    status.addBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L))
+    status.addBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 0L))
+    status.addBlock(RDDBlockId(4, 4), BlockStatus(memAndDisk, 0L, 0L))
+    status.addBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L))
     assert(status.blocks.size === status.numBlocks)
     assert(status.rddBlocks.size === status.numRddBlocks)
     assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
     assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
-    status.updateBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 10L, 400L))
-    status.updateBlock(RDDBlockId(4, 0), BlockStatus(memAndDisk, 0L, 0L, 100L))
-    status.updateBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L))
-    status.updateBlock(RDDBlockId(10, 10), BlockStatus(memAndDisk, 0L, 0L, 100L))
+    status.updateBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 10L))
+    status.updateBlock(RDDBlockId(4, 0), BlockStatus(memAndDisk, 0L, 0L))
+    status.updateBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L))
+    status.updateBlock(RDDBlockId(10, 10), BlockStatus(memAndDisk, 0L, 0L))
     assert(status.blocks.size === status.numBlocks)
     assert(status.rddBlocks.size === status.numRddBlocks)
     assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
@@ -244,29 +230,24 @@ class StorageSuite extends SparkFunSuite {
     val status = storageStatus2
     def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
     def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
-    def actualOffHeapUsed: Long = status.blocks.values.map(_.externalBlockStoreSize).sum
     assert(status.memUsed === actualMemUsed)
     assert(status.diskUsed === actualDiskUsed)
-    assert(status.offHeapUsed === actualOffHeapUsed)
-    status.addBlock(TestBlockId("fire"), BlockStatus(memAndDisk, 4000L, 5000L, 6000L))
-    status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L, 600L))
-    status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L, 60L))
+    status.addBlock(TestBlockId("fire"), BlockStatus(memAndDisk, 4000L, 5000L))
+    status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L))
+    status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L))
     assert(status.memUsed === actualMemUsed)
     assert(status.diskUsed === actualDiskUsed)
-    assert(status.offHeapUsed === actualOffHeapUsed)
-    status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L, 6L))
-    status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 4L, 5L, 6L))
-    status.updateBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 4L, 5L, 6L))
+    status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L))
+    status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 4L, 5L))
+    status.updateBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 4L, 5L))
     assert(status.memUsed === actualMemUsed)
     assert(status.diskUsed === actualDiskUsed)
-    assert(status.offHeapUsed === actualOffHeapUsed)
     status.removeBlock(TestBlockId("fire"))
     status.removeBlock(TestBlockId("man"))
     status.removeBlock(RDDBlockId(2, 2))
     status.removeBlock(RDDBlockId(2, 3))
     assert(status.memUsed === actualMemUsed)
     assert(status.diskUsed === actualDiskUsed)
-    assert(status.offHeapUsed === actualOffHeapUsed)
   }
 
   // For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
@@ -274,14 +255,14 @@ class StorageSuite extends SparkFunSuite {
     val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
     val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L)
     val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L)
-    status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
-    status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L, 0L))
-    status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L, 0L))
-    status2.addBlock(RDDBlockId(0, 3), BlockStatus(memAndDisk, 1L, 2L, 0L))
-    status2.addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
-    status2.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 1L, 2L, 0L))
-    status3.addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L))
-    status3.addBlock(RDDBlockId(1, 2), BlockStatus(memAndDisk, 1L, 2L, 0L))
+    status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L))
+    status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L))
+    status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L))
+    status2.addBlock(RDDBlockId(0, 3), BlockStatus(memAndDisk, 1L, 2L))
+    status2.addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L))
+    status2.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 1L, 2L))
+    status3.addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L))
+    status3.addBlock(RDDBlockId(1, 2), BlockStatus(memAndDisk, 1L, 2L))
     Seq(status1, status2, status3)
   }
 
@@ -334,9 +315,9 @@ class StorageSuite extends SparkFunSuite {
 
   test("StorageUtils.getRddBlockLocations with multiple locations") {
     val storageStatuses = stockStorageStatuses
-    storageStatuses(0).addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
-    storageStatuses(0).addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L))
-    storageStatuses(2).addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
+    storageStatuses(0).addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L))
+    storageStatuses(0).addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L))
+    storageStatuses(2).addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L))
     val blockLocations0 = StorageUtils.getRddBlockLocations(0, storageStatuses)
     val blockLocations1 = StorageUtils.getRddBlockLocations(1, storageStatuses)
     assert(blockLocations0.size === 5)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org