You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/10/30 23:18:01 UTC

git commit: [SPARK-4027][Streaming] WriteAheadLogBackedBlockRDD to read received either from BlockManager or WAL in HDFS

Repository: spark
Updated Branches:
  refs/heads/master 234de9232 -> fb1fbca20


[SPARK-4027][Streaming] WriteAheadLogBackedBlockRDD to read received either from BlockManager or WAL in HDFS

As part of the initiative of preventing data loss on streaming driver failure, this sub-task implements a BlockRDD that is backed by HDFS. This BlockRDD can either read data from the Spark's BlockManager, or read the data from file-segments in write ahead log in HDFS.

Most of this code has been written by @harishreedharan

Author: Tathagata Das <ta...@gmail.com>
Author: Hari Shreedharan <hs...@apache.org>

Closes #2931 from tdas/driver-ha-rdd and squashes the following commits:

209e49c [Tathagata Das] Better fix to style issue.
4a5866f [Tathagata Das] Addressed one more comment.
ed5fbf0 [Tathagata Das] Minor updates.
b0a18b1 [Tathagata Das] Fixed import order.
20aa7c6 [Tathagata Das] Fixed more line length issues.
29aa099 [Tathagata Das] Fixed line length issues.
9e47b5b [Tathagata Das] Renamed class, simplified+added unit tests.
6e1bfb8 [Tathagata Das] Tweaks testuite to create spark contxt lazily to prevent contxt leaks.
9c86a61 [Tathagata Das] Merge pull request #22 from harishreedharan/driver-ha-rdd
2878c38 [Hari Shreedharan] Shutdown spark context after tests. Formatting/minor fixes
c709f2f [Tathagata Das] Merge pull request #21 from harishreedharan/driver-ha-rdd
5cce16f [Hari Shreedharan] Make sure getBlockLocations uses offset and length to find the blocks on HDFS
eadde56 [Tathagata Das] Transferred HDFSBackedBlockRDD for the driver-ha-working branch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb1fbca2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb1fbca2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb1fbca2

Branch: refs/heads/master
Commit: fb1fbca204250840ffdbc0fcbf80b8dfeebf9edb
Parents: 234de92
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Oct 30 15:17:02 2014 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Oct 30 15:17:02 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |   4 +
 .../rdd/WriteAheadLogBackedBlockRDD.scala       | 125 +++++++++++++++
 .../apache/spark/streaming/util/HdfsUtils.scala |   8 +-
 .../rdd/WriteAheadLogBackedBlockRDDSuite.scala  | 151 +++++++++++++++++++
 4 files changed, 285 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb1fbca2/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 2673ec2..fffa191 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -84,5 +84,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
         "Attempted to use %s after its blocks have been removed!".format(toString))
     }
   }
+
+  protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
+    locations_
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fb1fbca2/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
new file mode 100644
index 0000000..23295bf
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark._
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader}
+
+/**
+ * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
+ * It contains information about the id of the blocks having this partition's data and
+ * the segment of the write ahead log that backs the partition.
+ * @param index index of the partition
+ * @param blockId id of the block having the partition data
+ * @param segment segment of the write ahead log having the partition data
+ */
+private[streaming]
+class WriteAheadLogBackedBlockRDDPartition(
+    val index: Int,
+    val blockId: BlockId,
+    val segment: WriteAheadLogFileSegment)
+  extends Partition
+
+
+/**
+ * This class represents a special case of the BlockRDD where the data blocks in
+ * the block manager are also backed by segments in write ahead logs. For reading
+ * the data, this RDD first looks up the blocks by their ids in the block manager.
+ * If it does not find them, it looks up the corresponding file segment.
+ *
+ * @param sc SparkContext
+ * @param hadoopConfig Hadoop configuration
+ * @param blockIds Ids of the blocks that contains this RDD's data
+ * @param segments Segments in write ahead logs that contain this RDD's data
+ * @param storeInBlockManager Whether to store in the block manager after reading from the segment
+ * @param storageLevel storage level to store when storing in block manager
+ *                     (applicable when storeInBlockManager = true)
+ */
+private[streaming]
+class WriteAheadLogBackedBlockRDD[T: ClassTag](
+    @transient sc: SparkContext,
+    @transient hadoopConfig: Configuration,
+    @transient blockIds: Array[BlockId],
+    @transient segments: Array[WriteAheadLogFileSegment],
+    storeInBlockManager: Boolean,
+    storageLevel: StorageLevel)
+  extends BlockRDD[T](sc, blockIds) {
+
+  require(
+    blockIds.length == segments.length,
+    s"Number of block ids (${blockIds.length}) must be " +
+      s"the same as number of segments (${segments.length}})!")
+
+  // Hadoop configuration is not serializable, so broadcast it as a serializable.
+  private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
+
+  override def getPartitions: Array[Partition] = {
+    assertValid()
+    Array.tabulate(blockIds.size) { i =>
+      new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i))
+    }
+  }
+
+  /**
+   * Gets the partition data by getting the corresponding block from the block manager.
+   * If the block does not exist, then the data is read from the corresponding segment
+   * in write ahead log files.
+   */
+  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    assertValid()
+    val hadoopConf = broadcastedHadoopConf.value
+    val blockManager = SparkEnv.get.blockManager
+    val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
+    val blockId = partition.blockId
+    blockManager.get(blockId) match {
+      case Some(block) => // Data is in Block Manager
+        val iterator = block.data.asInstanceOf[Iterator[T]]
+        logDebug(s"Read partition data of $this from block manager, block $blockId")
+        iterator
+      case None => // Data not found in Block Manager, grab it from write ahead log file
+        val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
+        val dataRead = reader.read(partition.segment)
+        reader.close()
+        logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
+        if (storeInBlockManager) {
+          blockManager.putBytes(blockId, dataRead, storageLevel)
+          logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
+          dataRead.rewind()
+        }
+        blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
+    }
+  }
+
+  /**
+   * Get the preferred location of the partition. This returns the locations of the block
+   * if it is present in the block manager, else it returns the location of the
+   * corresponding segment in HDFS.
+   */
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
+    val blockLocations = getBlockIdLocations().get(partition.blockId)
+    def segmentLocations = HdfsUtils.getFileSegmentLocations(
+      partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig)
+    blockLocations.getOrElse(segmentLocations)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fb1fbca2/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index 491f117..27a28ba 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -52,12 +52,14 @@ private[streaming] object HdfsUtils {
     }
   }
 
-  def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
+  /** Get the locations of the HDFS blocks containing the given file segment. */
+  def getFileSegmentLocations(
+      path: String, offset: Long, length: Long, conf: Configuration): Array[String] = {
     val dfsPath = new Path(path)
     val dfs = getFileSystemForPath(dfsPath, conf)
     val fileStatus = dfs.getFileStatus(dfsPath)
-    val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
-    blockLocs.map(_.flatMap(_.getHosts))
+    val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, offset, length))
+    blockLocs.map(_.flatMap(_.getHosts)).getOrElse(Array.empty)
   }
 
   def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {

http://git-wip-us.apache.org/repos/asf/spark/blob/fb1fbca2/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
new file mode 100644
index 0000000..1016024
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import java.io.File
+
+import scala.util.Random
+
+import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
+
+class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
+  val conf = new SparkConf()
+    .setMaster("local[2]")
+    .setAppName(this.getClass.getSimpleName)
+  val hadoopConf = new Configuration()
+
+  var sparkContext: SparkContext = null
+  var blockManager: BlockManager = null
+  var dir: File = null
+
+  override def beforeAll(): Unit = {
+    sparkContext = new SparkContext(conf)
+    blockManager = sparkContext.env.blockManager
+    dir = Files.createTempDir()
+  }
+
+  override def afterAll(): Unit = {
+    // Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests.
+    sparkContext.stop()
+    dir.delete()
+    System.clearProperty("spark.driver.port")
+  }
+
+  test("Read data available in block manager and write ahead log") {
+    testRDD(5, 5)
+  }
+
+  test("Read data available only in block manager, not in write ahead log") {
+    testRDD(5, 0)
+  }
+
+  test("Read data available only in write ahead log, not in block manager") {
+    testRDD(0, 5)
+  }
+
+  test("Read data available only in write ahead log, and test storing in block manager") {
+    testRDD(0, 5, testStoreInBM = true)
+  }
+
+  test("Read data with partially available in block manager, and rest in write ahead log") {
+    testRDD(3, 2)
+  }
+
+  /**
+   * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
+   * and the rest to a write ahead log, and then reading reading it all back using the RDD.
+   * It can also test if the partitions that were read from the log were again stored in
+   * block manager.
+   * @param numPartitionsInBM Number of partitions to write to the Block Manager
+   * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log
+   * @param testStoreInBM Test whether blocks read from log are stored back into block manager
+   */
+  private def testRDD(numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) {
+    val numBlocks = numPartitionsInBM + numPartitionsInWAL
+    val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50))
+
+    // Put the necessary blocks in the block manager
+    val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt()))
+    data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) =>
+      blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
+    }
+
+    // Generate write ahead log segments
+    val segments = generateFakeSegments(numPartitionsInBM) ++
+      writeLogSegments(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL))
+
+    // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
+    require(
+      blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty),
+      "Expected blocks not in BlockManager"
+    )
+    require(
+      blockIds.takeRight(numPartitionsInWAL).forall(blockManager.get(_).isEmpty),
+      "Unexpected blocks in BlockManager"
+    )
+
+    // Make sure that the right `numPartitionsInWAL` blocks are in write ahead logs, and other are not
+    require(
+      segments.takeRight(numPartitionsInWAL).forall(s =>
+        new File(s.path.stripPrefix("file://")).exists()),
+      "Expected blocks not in write ahead log"
+    )
+    require(
+      segments.take(numPartitionsInBM).forall(s =>
+        !new File(s.path.stripPrefix("file://")).exists()),
+      "Unexpected blocks in write ahead log"
+    )
+
+    // Create the RDD and verify whether the returned data is correct
+    val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
+      segments.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
+    assert(rdd.collect() === data.flatten)
+
+    if (testStoreInBM) {
+      val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
+        segments.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
+      assert(rdd2.collect() === data.flatten)
+      assert(
+        blockIds.forall(blockManager.get(_).nonEmpty),
+        "All blocks not found in block manager"
+      )
+    }
+  }
+
+  private def writeLogSegments(
+      blockData: Seq[Seq[String]],
+      blockIds: Seq[BlockId]
+    ): Seq[WriteAheadLogFileSegment] = {
+    require(blockData.size === blockIds.size)
+    val writer = new WriteAheadLogWriter(new File(dir, Random.nextString(10)).toString, hadoopConf)
+    val segments = blockData.zip(blockIds).map { case (data, id) =>
+      writer.write(blockManager.dataSerialize(id, data.iterator))
+    }
+    writer.close()
+    segments
+  }
+
+  private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = {
+    Array.fill(count)(new WriteAheadLogFileSegment("random", 0l, 0))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org