You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/05 10:45:58 UTC

spark git commit: [SPARK-7139] [STREAMING] Allow received block metadata to be saved to WAL and recovered on driver failure

Repository: spark
Updated Branches:
  refs/heads/master c5790a2f7 -> 1854ac326


[SPARK-7139] [STREAMING] Allow received block metadata to be saved to WAL and recovered on driver failure

- Enabled ReceivedBlockTracker WAL by default
- Stored block metadata in the WAL
- Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark

Author: Tathagata Das <ta...@gmail.com>

Closes #5732 from tdas/SPARK-7139 and squashes the following commits:

575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD
19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
685fab3 [Tathagata Das] Addressed comments in PR
637bc9c [Tathagata Das] Changed segment to handle
466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system
1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery
d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1854ac32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1854ac32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1854ac32

Branch: refs/heads/master
Commit: 1854ac326a9cc6014817d8df30ed0458eee5d7d1
Parents: c5790a2
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue May 5 01:45:19 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue May 5 01:45:19 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |   8 +-
 .../dstream/ReceiverInputDStream.scala          |  49 +++---
 .../rdd/WriteAheadLogBackedBlockRDD.scala       | 156 ++++++++++++-------
 .../receiver/ReceiverSupervisorImpl.scala       |   2 +-
 .../streaming/scheduler/ReceivedBlockInfo.scala |  30 +++-
 .../scheduler/ReceivedBlockTracker.scala        |  24 ++-
 .../streaming/scheduler/ReceiverTracker.scala   |   1 +
 .../apache/spark/streaming/util/HdfsUtils.scala |   2 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   |  55 ++++---
 .../rdd/WriteAheadLogBackedBlockRDDSuite.scala  | 107 ++++++++++---
 10 files changed, 281 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1854ac32/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 71578d1..9220302 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -31,7 +31,7 @@ private[spark]
 class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
   extends RDD[T](sc, Nil) {
 
-  @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
+  @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
   @volatile private var _isValid = true
 
   override def getPartitions: Array[Partition] = {
@@ -54,7 +54,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
     assertValid()
-    locations_(split.asInstanceOf[BlockRDDPartition].blockId)
+    _locations(split.asInstanceOf[BlockRDDPartition].blockId)
   }
 
   /**
@@ -79,14 +79,14 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
 
   /** Check if this BlockRDD is valid. If not valid, exception is thrown. */
   private[spark] def assertValid() {
-    if (!_isValid) {
+    if (!isValid) {
       throw new SparkException(
         "Attempted to use %s after its blocks have been removed!".format(toString))
     }
   }
 
   protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
-    locations_
+    _locations
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1854ac32/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index ba88416..15d9710 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -20,11 +20,11 @@ package org.apache.spark.streaming.dstream
 import scala.reflect.ClassTag
 
 import org.apache.spark.rdd.{BlockRDD, RDD}
-import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.storage.BlockId
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
-import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult}
-import org.apache.spark.streaming.scheduler.{InputInfo, ReceivedBlockInfo}
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.util.WriteAheadLogUtils
 
 /**
  * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -64,31 +64,30 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
       } else {
         // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
         // for this batch
-        val blockInfos =
-          ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
-        val blockStoreResults = blockInfos.map { _.blockStoreResult }
-        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
+        val receiverTracker = ssc.scheduler.receiverTracker
+        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
+        val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
 
-        // Register the input blocks information into InputInfoTracker
-        val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
-        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+        // Are WAL record handles present with all the blocks
+        val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
 
-        // Check whether all the results are of the same type
-        val resultTypes = blockStoreResults.map { _.getClass }.distinct
-        if (resultTypes.size > 1) {
-          logWarning("Multiple result types in block information, WAL information will be ignored.")
-        }
-
-        // If all the results are of type WriteAheadLogBasedStoreResult, then create
-        // WriteAheadLogBackedBlockRDD else create simple BlockRDD.
-        if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
-          val logSegments = blockStoreResults.map {
-            _.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
-          }.toArray
-          // Since storeInBlockManager = false, the storage level does not matter.
-          new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
-            blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
+        if (areWALRecordHandlesPresent) {
+          // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
+          val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
+          val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
+          new WriteAheadLogBackedBlockRDD[T](
+            ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
         } else {
+          // Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
+          // then that is unexpected and log a warning accordingly.
+          if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
+            if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
+              logError("Some blocks do not have Write Ahead Log information; " +
+                "this is unexpected and data may not be recoverable after driver failures")
+            } else {
+              logWarning("Some blocks have Write Ahead Log information; this is unexpected")
+            }
+          }
           new BlockRDD[T](ssc.sc, blockIds)
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/1854ac32/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index f4c8046..ffce6a4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -23,6 +23,8 @@ import java.util.UUID
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
+import org.apache.commons.io.FileUtils
+
 import org.apache.spark._
 import org.apache.spark.rdd.BlockRDD
 import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -31,30 +33,42 @@ import org.apache.spark.streaming.util._
 /**
  * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
  * It contains information about the id of the blocks having this partition's data and
- * the segment of the write ahead log that backs the partition.
+ * the corresponding record handle in the write ahead log that backs the partition.
  * @param index index of the partition
  * @param blockId id of the block having the partition data
+ * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
+ *                         executors). If not, then block lookups by the block ids will be skipped.
+ *                         By default, this is an empty array signifying true for all the blocks.
  * @param walRecordHandle Handle of the record in a write ahead log having the partition data
  */
 private[streaming]
 class WriteAheadLogBackedBlockRDDPartition(
     val index: Int,
     val blockId: BlockId,
-    val walRecordHandle: WriteAheadLogRecordHandle)
-  extends Partition
+    val isBlockIdValid: Boolean,
+    val walRecordHandle: WriteAheadLogRecordHandle
+  ) extends Partition
 
 
 /**
  * This class represents a special case of the BlockRDD where the data blocks in
  * the block manager are also backed by data in write ahead logs. For reading
  * the data, this RDD first looks up the blocks by their ids in the block manager.
- * If it does not find them, it looks up the corresponding data in the write ahead log.
+ * If it does not find them, it looks up the WAL using the corresponding record handle.
+ * The lookup of the blocks from the block manager can be skipped by setting the corresponding
+ * element in isBlockIdValid to false. This is a performance optimization which does not affect
+ * correctness, and it can be used in situations where it is known that the block
+ * does not exist in the Spark executors (e.g. after a failed driver is restarted).
+ *
  *
  * @param sc SparkContext
  * @param blockIds Ids of the blocks that contains this RDD's data
  * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
- * @param storeInBlockManager Whether to store in the block manager after reading
- *                            from the WAL record
+ * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
+ *                         executors). If not, then block lookups by the block ids will be skipped.
+ *                         By default, this is an empty array signifying true for all the blocks.
+ * @param storeInBlockManager Whether to store a block in the block manager
+ *                            after reading it from the WAL
  * @param storageLevel storage level to store when storing in block manager
  *                     (applicable when storeInBlockManager = true)
  */
@@ -63,23 +77,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
     @transient sc: SparkContext,
     @transient blockIds: Array[BlockId],
     @transient walRecordHandles: Array[WriteAheadLogRecordHandle],
-    storeInBlockManager: Boolean,
-    storageLevel: StorageLevel)
+    @transient isBlockIdValid: Array[Boolean] = Array.empty,
+    storeInBlockManager: Boolean = false,
+    storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
   extends BlockRDD[T](sc, blockIds) {
 
   require(
     blockIds.length == walRecordHandles.length,
-    s"Number of block ids (${blockIds.length}) must be " +
-      s"the same as number of WAL record handles (${walRecordHandles.length}})!")
+    s"Number of block Ids (${blockIds.length}) must be " +
+      s" same as number of WAL record handles (${walRecordHandles.length}})")
+
+  require(
+    isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
+    s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " +
+      s" same as number of block Ids (${blockIds.length})")
 
   // Hadoop configuration is not serializable, so broadcast it as a serializable.
   @transient private val hadoopConfig = sc.hadoopConfiguration
   private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
 
+  override def isValid(): Boolean = true
+
   override def getPartitions: Array[Partition] = {
     assertValid()
-    Array.tabulate(blockIds.size) { i =>
-      new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
+    Array.tabulate(blockIds.length) { i =>
+      val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
+      new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, walRecordHandles(i))
     }
   }
 
@@ -94,51 +117,57 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
     val blockManager = SparkEnv.get.blockManager
     val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
     val blockId = partition.blockId
-    blockManager.get(blockId) match {
-      case Some(block) => // Data is in Block Manager
-        val iterator = block.data.asInstanceOf[Iterator[T]]
-        logDebug(s"Read partition data of $this from block manager, block $blockId")
-        iterator
-      case None => // Data not found in Block Manager, grab it from write ahead log file
-        var dataRead: ByteBuffer = null
-        var writeAheadLog: WriteAheadLog = null
-        try {
-          // The WriteAheadLogUtils.createLog*** method needs a directory to create a
-          // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
-          // writing log data. However, the directory is not needed if data needs to be read, hence
-          // a dummy path is provided to satisfy the method parameter requirements.
-          // FileBasedWriteAheadLog will not create any file or directory at that path.
-          // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
-          // this dummy directory should not already exist otherwise the WAL will try to recover
-          // past events from the directory and throw errors.
-          val nonExistentDirectory = new File(
-            System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
-          writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
-            SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
-          dataRead = writeAheadLog.read(partition.walRecordHandle)
-        } catch {
-          case NonFatal(e) =>
-            throw new SparkException(
-              s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
-        } finally {
-          if (writeAheadLog != null) {
-            writeAheadLog.close()
-            writeAheadLog = null
-          }
-        }
-        if (dataRead == null) {
+
+    def getBlockFromBlockManager(): Option[Iterator[T]] = {
+      blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
+    }
+
+    def getBlockFromWriteAheadLog(): Iterator[T] = {
+      var dataRead: ByteBuffer = null
+      var writeAheadLog: WriteAheadLog = null
+      try {
+        // The WriteAheadLogUtils.createLog*** method needs a directory to create a
+        // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
+        // writing log data. However, the directory is not needed if data needs to be read, hence
+        // a dummy path is provided to satisfy the method parameter requirements.
+        // FileBasedWriteAheadLog will not create any file or directory at that path.
+        // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
+        // this dummy directory should not already exist otherwise the WAL will try to recover
+        // past events from the directory and throw errors.
+        val nonExistentDirectory = new File(
+          System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
+        writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
+          SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
+        dataRead = writeAheadLog.read(partition.walRecordHandle)
+      } catch {
+        case NonFatal(e) =>
           throw new SparkException(
-            s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
-              s"read returned null")
+            s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
+      } finally {
+        if (writeAheadLog != null) {
+          writeAheadLog.close()
+          writeAheadLog = null
         }
-        logInfo(s"Read partition data of $this from write ahead log, record handle " +
-          partition.walRecordHandle)
-        if (storeInBlockManager) {
-          blockManager.putBytes(blockId, dataRead, storageLevel)
-          logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
-          dataRead.rewind()
-        }
-        blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
+      }
+      if (dataRead == null) {
+        throw new SparkException(
+          s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
+            s"read returned null")
+      }
+      logInfo(s"Read partition data of $this from write ahead log, record handle " +
+        partition.walRecordHandle)
+      if (storeInBlockManager) {
+        blockManager.putBytes(blockId, dataRead, storageLevel)
+        logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
+        dataRead.rewind()
+      }
+      blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
+    }
+
+    if (partition.isBlockIdValid) {
+      getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
+    } else {
+      getBlockFromWriteAheadLog()
     }
   }
 
@@ -149,12 +178,23 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
    */
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
-    val blockLocations = getBlockIdLocations().get(partition.blockId)
+    val blockLocations = if (partition.isBlockIdValid) {
+      getBlockIdLocations().get(partition.blockId)
+    } else {
+      None
+    }
+
     blockLocations.getOrElse {
       partition.walRecordHandle match {
         case fileSegment: FileBasedWriteAheadLogSegment =>
-          HdfsUtils.getFileSegmentLocations(
-            fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
+          try {
+            HdfsUtils.getFileSegmentLocations(
+              fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
+          } catch {
+            case NonFatal(e) =>
+              logError("Error getting WAL file segment locations", e)
+              Seq.empty
+          }
         case _ =>
           Seq.empty
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/1854ac32/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 93f047b..9293837 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl(
     val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
     logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
 
-    val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
+    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
     trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
     logDebug(s"Reported block $blockId")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1854ac32/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
index 94beb59..dc11e84 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
@@ -17,12 +17,38 @@
 
 package org.apache.spark.streaming.scheduler
 
-import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult
+import org.apache.spark.storage.StreamBlockId
+import org.apache.spark.streaming.receiver.{ReceivedBlockStoreResult, WriteAheadLogBasedStoreResult}
+import org.apache.spark.streaming.util.WriteAheadLogRecordHandle
 
 /** Information about blocks received by the receiver */
 private[streaming] case class ReceivedBlockInfo(
     streamId: Int,
     numRecords: Long,
+    metadataOption: Option[Any],
     blockStoreResult: ReceivedBlockStoreResult
-  )
+  ) {
+
+  @volatile private var _isBlockIdValid = true
+
+  def blockId: StreamBlockId = blockStoreResult.blockId
+
+  def walRecordHandleOption: Option[WriteAheadLogRecordHandle] = {
+    blockStoreResult match {
+      case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.walRecordHandle)
+      case _ => None
+    }
+  }
+
+  /** Is the block ID valid, that is, is the block present in the Spark executors. */
+  def isBlockIdValid(): Boolean = _isBlockIdValid
+
+  /**
+   * Set the block ID as invalid. This is useful when it is known that the block is not present
+   * in the Spark executors.
+   */
+  def setBlockIdInvalid(): Unit = {
+    _isBlockIdValid = false
+  }
+}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1854ac32/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 14e769a..a9f4147 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -45,7 +45,7 @@ private[streaming] case class BatchCleanupEvent(times: Seq[Time])
 private[streaming]
 case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
   def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
-    streamIdToAllocatedBlocks.get(streamId).getOrElse(Seq.empty)
+    streamIdToAllocatedBlocks.getOrElse(streamId, Seq.empty)
   }
 }
 
@@ -63,6 +63,7 @@ private[streaming] class ReceivedBlockTracker(
     hadoopConf: Configuration,
     streamIds: Seq[Int],
     clock: Clock,
+    recoverFromWriteAheadLog: Boolean,
     checkpointDirOption: Option[String])
   extends Logging {
 
@@ -75,7 +76,9 @@ private[streaming] class ReceivedBlockTracker(
   private var lastAllocatedBatchTime: Time = null
 
   // Recover block information from write ahead logs
-  recoverFromWriteAheadLogs()
+  if (recoverFromWriteAheadLog) {
+    recoverPastEvents()
+  }
 
   /** Add received block. This event will get written to the write ahead log (if enabled). */
   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
@@ -167,10 +170,11 @@ private[streaming] class ReceivedBlockTracker(
    * Recover all the tracker actions from the write ahead logs to recover the state (unallocated
    * and allocated block info) prior to failure.
    */
-  private def recoverFromWriteAheadLogs(): Unit = synchronized {
+  private def recoverPastEvents(): Unit = synchronized {
     // Insert the recovered block information
     def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
       logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
+      receivedBlockInfo.setBlockIdInvalid()
       getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
     }
 
@@ -224,19 +228,9 @@ private[streaming] class ReceivedBlockTracker(
 
   /** Optionally create the write ahead log manager only if the feature is enabled */
   private def createWriteAheadLog(): Option[WriteAheadLog] = {
-    if (WriteAheadLogUtils.enableReceiverLog(conf)) {
-      if (checkpointDirOption.isEmpty) {
-        throw new SparkException(
-          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
-            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
-            "See documentation for more details.")
-      }
+    checkpointDirOption.map { checkpointDir =>
       val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
-
-      val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
-      Some(log)
-    } else {
-      None
+      WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1854ac32/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 1af6571..3c34139 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -62,6 +62,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     ssc.sparkContext.hadoopConfiguration,
     receiverInputStreamIds,
     ssc.scheduler.clock,
+    ssc.isCheckpointPresent,
     Option(ssc.checkpointDir)
   )
   private val listenerBus = ssc.scheduler.listenerBus

http://git-wip-us.apache.org/repos/asf/spark/blob/1854ac32/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index 858ba3c..f60688f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -27,7 +27,7 @@ private[streaming] object HdfsUtils {
     // If the file exists and we have append support, append instead of creating a new file
     val stream: FSDataOutputStream = {
       if (dfs.isFile(dfsPath)) {
-        if (conf.getBoolean("hdfs.append.support", false)) {
+        if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) {
           dfs.append(dfsPath)
         } else {
           throw new IllegalStateException("File exists and there is no append support!")

http://git-wip-us.apache.org/repos/asf/spark/blob/1854ac32/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 8317fb9..b1af8d5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -67,15 +67,20 @@ class ReceivedBlockTrackerSuite
 
     // Verify added blocks are unallocated blocks
     receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+    receivedBlockTracker.hasUnallocatedReceivedBlocks should be (true)
+
 
     // Allocate the blocks to a batch and verify that all of them have been allocated
     receivedBlockTracker.allocateBlocksToBatch(1)
     receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos
+    receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos)
     receivedBlockTracker.getUnallocatedBlocks(streamId) shouldBe empty
+    receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false)
 
     // Allocate no blocks to another batch
     receivedBlockTracker.allocateBlocksToBatch(2)
     receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
+    receivedBlockTracker.getBlocksOfBatch(2) shouldEqual Map(streamId -> Seq.empty)
 
     // Verify that older batches have no operation on batch allocation,
     // will return the same blocks as previously allocated.
@@ -88,7 +93,7 @@ class ReceivedBlockTrackerSuite
     receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
-  test("block addition, block to batch allocation and clean up with write ahead log") {
+  test("recovery and cleanup with write ahead logs") {
     val manualClock = new ManualClock
     // Set the time increment level to twice the rotation interval so that every increment creates
     // a new log file
@@ -114,9 +119,7 @@ class ReceivedBlockTrackerSuite
     }
 
     // Set WAL configuration
-    conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
     conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
-    require(WriteAheadLogUtils.enableReceiverLog(conf))
     require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
 
     // Start tracker and add blocks
@@ -131,15 +134,27 @@ class ReceivedBlockTrackerSuite
     getWrittenLogData() shouldEqual expectedWrittenData1
     getWriteAheadLogFiles() should have size 1
 
-    // Restart tracker and verify recovered list of unallocated blocks
     incrementTime()
-    val tracker2 = createTracker(clock = manualClock)
-    tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
+
+    // Recovery without recovery from WAL and verify list of unallocated blocks is empty
+    val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
+    tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
+    tracker1_.hasUnallocatedReceivedBlocks should be (false)
+
+    // Restart tracker and verify recovered list of unallocated blocks
+    val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
+    val unallocatedBlocks = tracker2.getUnallocatedBlocks(streamId).toList
+    unallocatedBlocks shouldEqual blockInfos1
+    unallocatedBlocks.foreach { block =>
+      block.isBlockIdValid() should be (false)
+    }
+
 
     // Allocate blocks to batch and verify whether the unallocated blocks got allocated
     val batchTime1 = manualClock.getTimeMillis()
     tracker2.allocateBlocksToBatch(batchTime1)
     tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
+    tracker2.getBlocksOfBatch(batchTime1) shouldEqual Map(streamId -> blockInfos1)
 
     // Add more blocks and allocate to another batch
     incrementTime()
@@ -157,7 +172,7 @@ class ReceivedBlockTrackerSuite
 
     // Restart tracker and verify recovered state
     incrementTime()
-    val tracker3 = createTracker(clock = manualClock)
+    val tracker3 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
     tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
     tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
     tracker3.getUnallocatedBlocks(streamId) shouldBe empty
@@ -180,28 +195,16 @@ class ReceivedBlockTrackerSuite
     // Restart tracker and verify recovered state, specifically whether info about the first
     // batch has been removed, but not the second batch
     incrementTime()
-    val tracker4 = createTracker(clock = manualClock)
+    val tracker4 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
     tracker4.getUnallocatedBlocks(streamId) shouldBe empty
     tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty  // should be cleaned
     tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
   }
 
-  test("enabling write ahead log but not setting checkpoint dir") {
-    conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
-    intercept[SparkException] {
-      createTracker(setCheckpointDir = false)
-    }
-  }
-
-  test("setting checkpoint dir but not enabling write ahead log") {
-    // When WAL config is not set, log manager should not be enabled
-    val tracker1 = createTracker(setCheckpointDir = true)
+  test("disable write ahead log when checkpoint directory is not set") {
+    // When checkpoint is disabled, then the write ahead log is disabled
+    val tracker1 = createTracker(setCheckpointDir = false)
     tracker1.isWriteAheadLogEnabled should be (false)
-
-    // When WAL is explicitly disabled, log manager should not be enabled
-    conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
-    val tracker2 = createTracker(setCheckpointDir = true)
-    tracker2.isWriteAheadLogEnabled should be(false)
   }
 
   /**
@@ -210,16 +213,18 @@ class ReceivedBlockTrackerSuite
    */
   def createTracker(
       setCheckpointDir: Boolean = true,
+      recoverFromWriteAheadLog: Boolean = false,
       clock: Clock = new SystemClock): ReceivedBlockTracker = {
     val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
-    val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption)
+    val tracker = new ReceivedBlockTracker(
+      conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption)
     allReceivedBlockTrackers += tracker
     tracker
   }
 
   /** Generate blocks infos using random ids */
   def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
-    List.fill(5)(ReceivedBlockInfo(streamId, 0,
+    List.fill(5)(ReceivedBlockInfo(streamId, 0, None,
       BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1854ac32/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index 8b300d8..6859b65 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
 import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter}
 import org.apache.spark.util.Utils
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
 
 class WriteAheadLogBackedBlockRDDSuite
   extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
@@ -60,24 +60,35 @@ class WriteAheadLogBackedBlockRDDSuite
     System.clearProperty("spark.driver.port")
   }
 
-  test("Read data available in block manager and write ahead log") {
-    testRDD(5, 5)
+  test("Read data available in both block manager and write ahead log") {
+    testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5)
   }
 
   test("Read data available only in block manager, not in write ahead log") {
-    testRDD(5, 0)
+    testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0)
   }
 
   test("Read data available only in write ahead log, not in block manager") {
-    testRDD(0, 5)
+    testRDD(numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5)
   }
 
-  test("Read data available only in write ahead log, and test storing in block manager") {
-    testRDD(0, 5, testStoreInBM = true)
+  test("Read data with partially available in block manager, and rest in write ahead log") {
+    testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2)
   }
 
-  test("Read data with partially available in block manager, and rest in write ahead log") {
-    testRDD(3, 2)
+  test("Test isBlockValid skips block fetching from BlockManager") {
+    testRDD(
+      numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0, testIsBlockValid = true)
+  }
+
+  test("Test whether RDD is valid after removing blocks from block manager") {
+    testRDD(
+      numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5, testBlockRemove = true)
+  }
+
+  test("Test storing of blocks recovered from write ahead log back into block manager") {
+    testRDD(
+      numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5, testStoreInBM = true)
   }
 
   /**
@@ -85,23 +96,52 @@ class WriteAheadLogBackedBlockRDDSuite
    * and the rest to a write ahead log, and then reading reading it all back using the RDD.
    * It can also test if the partitions that were read from the log were again stored in
    * block manager.
-   * @param numPartitionsInBM Number of partitions to write to the Block Manager
-   * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log
-   * @param testStoreInBM Test whether blocks read from log are stored back into block manager
+   *
+   *
+   *
+   * @param numPartitions Number of partitions in RDD
+   * @param numPartitionsInBM Number of partitions to write to the BlockManager.
+   *                          Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
+   * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log.
+   *                           Partitions (numPartitions - 1 - numPartitionsInWAL) to
+   *                           (numPartitions - 1) will be written to WAL
+   * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching
+   * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with
+   *                        reads falling back to the WAL
+   * @param testStoreInBM   Test whether blocks read from log are stored back into block manager
+   *
+   * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4
+   *
+   *   numPartitionsInBM = 3
+   *   |------------------|
+   *   |                  |
+   *    0       1       2       3       4
+   *           |                         |
+   *           |-------------------------|
+   *              numPartitionsInWAL = 4
    */
   private def testRDD(
-      numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) {
-    val numBlocks = numPartitionsInBM + numPartitionsInWAL
-    val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50))
+      numPartitions: Int,
+      numPartitionsInBM: Int,
+      numPartitionsInWAL: Int,
+      testIsBlockValid: Boolean = false,
+      testBlockRemove: Boolean = false,
+      testStoreInBM: Boolean = false
+    ) {
+    require(numPartitionsInBM <= numPartitions,
+      "Can't put more partitions in BlockManager than that in RDD")
+    require(numPartitionsInWAL <= numPartitions,
+      "Can't put more partitions in write ahead log than that in RDD")
+    val data = Seq.fill(numPartitions, 10)(scala.util.Random.nextString(50))
 
     // Put the necessary blocks in the block manager
-    val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt()))
+    val blockIds = Array.fill(numPartitions)(StreamBlockId(Random.nextInt(), Random.nextInt()))
     data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) =>
       blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
     }
 
-    // Generate write ahead log file segments
-    val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++
+    // Generate write ahead log record handles
+    val recordHandles = generateFakeRecordHandles(numPartitions - numPartitionsInWAL) ++
       generateWALRecordHandles(data.takeRight(numPartitionsInWAL),
         blockIds.takeRight(numPartitionsInWAL))
 
@@ -111,7 +151,7 @@ class WriteAheadLogBackedBlockRDDSuite
       "Expected blocks not in BlockManager"
     )
     require(
-      blockIds.takeRight(numPartitionsInWAL).forall(blockManager.get(_).isEmpty),
+      blockIds.takeRight(numPartitions - numPartitionsInBM).forall(blockManager.get(_).isEmpty),
       "Unexpected blocks in BlockManager"
     )
 
@@ -122,19 +162,42 @@ class WriteAheadLogBackedBlockRDDSuite
       "Expected blocks not in write ahead log"
     )
     require(
-      recordHandles.take(numPartitionsInBM).forall(s =>
+      recordHandles.take(numPartitions - numPartitionsInWAL).forall(s =>
         !new File(s.path.stripPrefix("file://")).exists()),
       "Unexpected blocks in write ahead log"
     )
 
     // Create the RDD and verify whether the returned data is correct
     val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
-      recordHandles.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
+      recordHandles.toArray, storeInBlockManager = false)
     assert(rdd.collect() === data.flatten)
 
+    // Verify that the block fetching is skipped when isBlockValid is set to false.
+    // This is done by using a RDD whose data is only in memory but is set to skip block fetching
+    // Using that RDD will throw exception, as it skips block fetching even if the blocks are in
+    // in BlockManager.
+    if (testIsBlockValid) {
+      require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
+      require(numPartitionsInWAL === 0, "No partitions must be in WAL")
+      val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
+        recordHandles.toArray, isBlockIdValid = Array.fill(blockIds.length)(false))
+      intercept[SparkException] {
+        rdd2.collect()
+      }
+    }
+
+    // Verify that the RDD is not invalid after the blocks are removed and can still read data
+    // from write ahead log
+    if (testBlockRemove) {
+      require(numPartitions === numPartitionsInWAL, "All partitions must be in WAL for this test")
+      require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test")
+      rdd.removeBlocks()
+      assert(rdd.collect() === data.flatten)
+    }
+
     if (testStoreInBM) {
       val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
-        recordHandles.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
+        recordHandles.toArray, storeInBlockManager = true, storageLevel = StorageLevel.MEMORY_ONLY)
       assert(rdd2.collect() === data.flatten)
       assert(
         blockIds.forall(blockManager.get(_).nonEmpty),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org