You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/10/30 22:51:17 UTC

git commit: [SPARK-4028][Streaming] ReceivedBlockHandler interface to abstract the functionality of storage of received data

Repository: spark
Updated Branches:
  refs/heads/master d9327192e -> 234de9232


[SPARK-4028][Streaming] ReceivedBlockHandler interface to abstract the functionality of storage of received data

As part of the initiative to prevent data loss on streaming driver failure, this JIRA tracks the subtask of implementing a ReceivedBlockHandler, that abstracts the functionality of storage of received data blocks. The default implementation will maintain the current behavior of storing the data into BlockManager. The optional implementation will store the data to both BlockManager as well as a write ahead log.

Author: Tathagata Das <ta...@gmail.com>

Closes #2940 from tdas/driver-ha-rbh and squashes the following commits:

78a4aaa [Tathagata Das] Fixed bug causing test failures.
f192f47 [Tathagata Das] Fixed import order.
df5f320 [Tathagata Das] Updated code to use ReceivedBlockStoreResult as the return type for handler's storeBlock
33c30c9 [Tathagata Das] Added license, and organized imports.
2f025b3 [Tathagata Das] Updates based on PR comments.
18aec1e [Tathagata Das] Moved ReceivedBlockInfo back into spark.streaming.scheduler package
95a4987 [Tathagata Das] Added ReceivedBlockHandler and its associated tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/234de923
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/234de923
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/234de923

Branch: refs/heads/master
Commit: 234de9232bcfa212317a8073c4a82c3863b36b14
Parents: d932719
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Oct 30 14:51:13 2014 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Oct 30 14:51:13 2014 -0700

----------------------------------------------------------------------
 .../dstream/ReceiverInputDStream.scala          |   7 +-
 .../streaming/receiver/ReceivedBlock.scala      |  35 +++
 .../receiver/ReceivedBlockHandler.scala         | 193 ++++++++++++++
 .../receiver/ReceiverSupervisorImpl.scala       |  88 ++++---
 .../spark/streaming/scheduler/BatchInfo.scala   |   2 +-
 .../spark/streaming/scheduler/JobSet.scala      |   3 +-
 .../streaming/scheduler/ReceivedBlockInfo.scala |  28 ++
 .../streaming/scheduler/ReceiverTracker.scala   |  24 +-
 .../util/WriteAheadLogRandomReader.scala        |   1 -
 .../streaming/ReceivedBlockHandlerSuite.scala   | 258 +++++++++++++++++++
 .../streaming/util/WriteAheadLogSuite.scala     |  34 ++-
 11 files changed, 603 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 391e409..bb47d37 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -23,8 +23,9 @@ import scala.reflect.ClassTag
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.BlockId
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, BlockManagerBasedStoreResult, Receiver}
 import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.SparkException
 
 /**
  * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -65,10 +66,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     if (validTime >= graph.startTime) {
       val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
       receivedBlockInfo(validTime) = blockInfo
-      val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
+      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
       Some(new BlockRDD[T](ssc.sc, blockIds))
     } else {
-      Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
+      Some(new BlockRDD[T](ssc.sc, Array.empty))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
new file mode 100644
index 0000000..47968af
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.language.existentials
+
+/** Trait representing a received block */
+private[streaming] sealed trait ReceivedBlock
+
+/** class representing a block received as an ArrayBuffer */
+private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
+
+/** class representing a block received as an Iterator */
+private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock
+
+/** class representing a block received as an ByteBuffer */
+private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
new file mode 100644
index 0000000..fdf9953
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.language.{existentials, postfixOps}
+
+import WriteAheadLogBasedBlockHandler._
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.storage._
+import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogFileSegment, WriteAheadLogManager}
+import org.apache.spark.util.Utils
+
+/** Trait that represents the metadata related to storage of blocks */
+private[streaming] trait ReceivedBlockStoreResult {
+  def blockId: StreamBlockId  // Any implementation of this trait will store a block id
+}
+
+/** Trait that represents a class that handles the storage of blocks received by receiver */
+private[streaming] trait ReceivedBlockHandler {
+
+  /** Store a received block with the given block id and return related metadata */
+  def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
+
+  /** Cleanup old blocks older than the given threshold time */
+  def cleanupOldBlock(threshTime: Long)
+}
+
+
+/**
+ * Implementation of [[org.apache.spark.streaming.receiver.ReceivedBlockStoreResult]]
+ * that stores the metadata related to storage of blocks using
+ * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
+ */
+private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId)
+  extends ReceivedBlockStoreResult
+
+
+/**
+ * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
+ * stores the received blocks into a block manager with the specified storage level.
+ */
+private[streaming] class BlockManagerBasedBlockHandler(
+    blockManager: BlockManager, storageLevel: StorageLevel)
+  extends ReceivedBlockHandler with Logging {
+  
+  def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
+    val putResult: Seq[(BlockId, BlockStatus)] = block match {
+      case ArrayBufferBlock(arrayBuffer) =>
+        blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
+      case IteratorBlock(iterator) =>
+        blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
+      case ByteBufferBlock(byteBuffer) =>
+        blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
+      case o =>
+        throw new SparkException(
+          s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
+    }
+    if (!putResult.map { _._1 }.contains(blockId)) {
+      throw new SparkException(
+        s"Could not store $blockId to block manager with storage level $storageLevel")
+    }
+    BlockManagerBasedStoreResult(blockId)
+  }
+
+  def cleanupOldBlock(threshTime: Long) {
+    // this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
+    // of BlockRDDs.
+  }
+}
+
+
+/**
+ * Implementation of [[org.apache.spark.streaming.receiver.ReceivedBlockStoreResult]]
+ * that stores the metadata related to storage of blocks using
+ * [[org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler]]
+ */
+private[streaming] case class WriteAheadLogBasedStoreResult(
+    blockId: StreamBlockId,
+    segment: WriteAheadLogFileSegment
+  ) extends ReceivedBlockStoreResult
+
+
+/**
+ * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
+ * stores the received blocks in both, a write ahead log and a block manager.
+ */
+private[streaming] class WriteAheadLogBasedBlockHandler(
+    blockManager: BlockManager,
+    streamId: Int,
+    storageLevel: StorageLevel,
+    conf: SparkConf,
+    hadoopConf: Configuration,
+    checkpointDir: String,
+    clock: Clock = new SystemClock
+  ) extends ReceivedBlockHandler with Logging {
+
+  private val blockStoreTimeout = conf.getInt(
+    "spark.streaming.receiver.blockStoreTimeout", 30).seconds
+  private val rollingInterval = conf.getInt(
+    "spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
+  private val maxFailures = conf.getInt(
+    "spark.streaming.receiver.writeAheadLog.maxFailures", 3)
+
+  // Manages rolling log files
+  private val logManager = new WriteAheadLogManager(
+    checkpointDirToLogDir(checkpointDir, streamId),
+    hadoopConf, rollingInterval, maxFailures,
+    callerName = this.getClass.getSimpleName,
+    clock = clock
+  )
+
+  // For processing futures used in parallel block storing into block manager and write ahead log
+  // # threads = 2, so that both writing to BM and WAL can proceed in parallel
+  implicit private val executionContext = ExecutionContext.fromExecutorService(
+    Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
+
+  /**
+   * This implementation stores the block into the block manager as well as a write ahead log.
+   * It does this in parallel, using Scala Futures, and returns only after the block has
+   * been stored in both places.
+   */
+  def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
+
+    // Serialize the block so that it can be inserted into both
+    val serializedBlock = block match {
+      case ArrayBufferBlock(arrayBuffer) =>
+        blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+      case IteratorBlock(iterator) =>
+        blockManager.dataSerialize(blockId, iterator)
+      case ByteBufferBlock(byteBuffer) =>
+        byteBuffer
+      case _ =>
+        throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
+    }
+
+    // Store the block in block manager
+    val storeInBlockManagerFuture = Future {
+      val putResult =
+        blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
+      if (!putResult.map { _._1 }.contains(blockId)) {
+        throw new SparkException(
+          s"Could not store $blockId to block manager with storage level $storageLevel")
+      }
+    }
+
+    // Store the block in write ahead log
+    val storeInWriteAheadLogFuture = Future {
+      logManager.writeToLog(serializedBlock)
+    }
+
+    // Combine the futures, wait for both to complete, and return the write ahead log segment
+    val combinedFuture = for {
+      _ <- storeInBlockManagerFuture
+      fileSegment <- storeInWriteAheadLogFuture
+    } yield fileSegment
+    val segment = Await.result(combinedFuture, blockStoreTimeout)
+    WriteAheadLogBasedStoreResult(blockId, segment)
+  }
+
+  def cleanupOldBlock(threshTime: Long) {
+    logManager.cleanupOldLogs(threshTime)
+  }
+
+  def stop() {
+    logManager.stop()
+  }
+}
+
+private[streaming] object WriteAheadLogBasedBlockHandler {
+  def checkpointDirToLogDir(checkpointDir: String, streamId: Int): String = {
+    new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 53a3e62..5360412 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -25,16 +25,13 @@ import scala.concurrent.Await
 
 import akka.actor.{Actor, Props}
 import akka.pattern.ask
-
 import com.google.common.base.Throwables
-
-import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.{Logging, SparkEnv, SparkException}
 import org.apache.spark.storage.StreamBlockId
-import org.apache.spark.streaming.scheduler.DeregisterReceiver
-import org.apache.spark.streaming.scheduler.AddBlock
-import org.apache.spark.streaming.scheduler.RegisterReceiver
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.util.WriteAheadLogFileSegment
+import org.apache.spark.util.{AkkaUtils, Utils}
 
 /**
  * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -44,12 +41,26 @@ import org.apache.spark.streaming.scheduler.RegisterReceiver
  */
 private[streaming] class ReceiverSupervisorImpl(
     receiver: Receiver[_],
-    env: SparkEnv
+    env: SparkEnv,
+    hadoopConf: Configuration,
+    checkpointDirOption: Option[String]
   ) extends ReceiverSupervisor(receiver, env.conf) with Logging {
 
-  private val blockManager = env.blockManager
+  private val receivedBlockHandler: ReceivedBlockHandler = {
+    if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+      if (checkpointDirOption.isEmpty) {
+        throw new SparkException(
+          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
+            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
+            "See documentation for more details.")
+      }
+      new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
+        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
+    } else {
+      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
+    }
+  }
 
-  private val storageLevel = receiver.storageLevel
 
   /** Remote Akka actor for the ReceiverTracker */
   private val trackerActor = {
@@ -105,47 +116,50 @@ private[streaming] class ReceiverSupervisorImpl(
   /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
   def pushArrayBuffer(
       arrayBuffer: ArrayBuffer[_],
-      optionalMetadata: Option[Any],
-      optionalBlockId: Option[StreamBlockId]
+      metadataOption: Option[Any],
+      blockIdOption: Option[StreamBlockId]
     ) {
-    val blockId = optionalBlockId.getOrElse(nextBlockId)
-    val time = System.currentTimeMillis
-    blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
-    logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time)  + " ms")
-    reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
+    pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
   }
 
   /** Store a iterator of received data as a data block into Spark's memory. */
   def pushIterator(
       iterator: Iterator[_],
-      optionalMetadata: Option[Any],
-      optionalBlockId: Option[StreamBlockId]
+      metadataOption: Option[Any],
+      blockIdOption: Option[StreamBlockId]
     ) {
-    val blockId = optionalBlockId.getOrElse(nextBlockId)
-    val time = System.currentTimeMillis
-    blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
-    logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time)  + " ms")
-    reportPushedBlock(blockId, -1, optionalMetadata)
+    pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption)
   }
 
   /** Store the bytes of received data as a data block into Spark's memory. */
   def pushBytes(
       bytes: ByteBuffer,
-      optionalMetadata: Option[Any],
-      optionalBlockId: Option[StreamBlockId]
+      metadataOption: Option[Any],
+      blockIdOption: Option[StreamBlockId]
     ) {
-    val blockId = optionalBlockId.getOrElse(nextBlockId)
-    val time = System.currentTimeMillis
-    blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true)
-    logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time)  + " ms")
-    reportPushedBlock(blockId, -1, optionalMetadata)
+    pushAndReportBlock(ByteBufferBlock(bytes), metadataOption, blockIdOption)
   }
 
-  /** Report pushed block */
-  def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) {
-    val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull)
-    trackerActor ! AddBlock(blockInfo)
-    logDebug("Reported block " + blockId)
+  /** Store block and report it to driver */
+  def pushAndReportBlock(
+      receivedBlock: ReceivedBlock,
+      metadataOption: Option[Any],
+      blockIdOption: Option[StreamBlockId]
+    ) {
+    val blockId = blockIdOption.getOrElse(nextBlockId)
+    val numRecords = receivedBlock match {
+      case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
+      case _ => -1
+    }
+
+    val time = System.currentTimeMillis
+    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
+    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
+
+    val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
+    val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
+    Await.result(future, askTimeout)
+    logDebug(s"Reported block $blockId")
   }
 
   /** Report error to the receiver tracker */

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index a68aecb..92dc113 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.streaming.scheduler
 
-import org.apache.spark.streaming.Time
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.streaming.Time
 
 /**
  * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index a69d743..8c15a75 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.streaming.scheduler
 
-import scala.collection.mutable.{ArrayBuffer, HashSet}
+import scala.collection.mutable.HashSet
+
 import org.apache.spark.streaming.Time
 
 /** Class representing a set of Jobs

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
new file mode 100644
index 0000000..94beb59
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult
+
+/** Information about blocks received by the receiver */
+private[streaming] case class ReceivedBlockInfo(
+    streamId: Int,
+    numRecords: Long,
+    blockStoreResult: ReceivedBlockStoreResult
+  )
+

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 7149dbc..d696563 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -21,21 +21,12 @@ import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue}
 import scala.language.existentials
 
 import akka.actor._
-import org.apache.spark.{Logging, SparkEnv, SparkException}
+import org.apache.spark.{SerializableWritable, Logging, SparkEnv, SparkException}
 import org.apache.spark.SparkContext._
-import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
 import org.apache.spark.util.AkkaUtils
 
-/** Information about blocks received by the receiver */
-private[streaming] case class ReceivedBlockInfo(
-    streamId: Int,
-    blockId: StreamBlockId,
-    numRecords: Long,
-    metadata: Any
-  )
-
 /**
  * Messages used by the NetworkReceiver and the ReceiverTracker to communicate
  * with each other.
@@ -153,7 +144,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
   def addBlocks(receivedBlockInfo: ReceivedBlockInfo) {
     getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
     logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " +
-      receivedBlockInfo.blockId)
+      receivedBlockInfo.blockStoreResult.blockId)
   }
 
   /** Report error sent by a receiver */
@@ -188,6 +179,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
         sender ! true
       case AddBlock(receivedBlockInfo) =>
         addBlocks(receivedBlockInfo)
+        sender ! true
       case ReportError(streamId, message, error) =>
         reportError(streamId, message, error)
       case DeregisterReceiver(streamId, message, error) =>
@@ -252,6 +244,9 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
           ssc.sc.makeRDD(receivers, receivers.size)
         }
 
+      val checkpointDirOption = Option(ssc.checkpointDir)
+      val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)
+
       // Function to start the receiver on the worker node
       val startReceiver = (iterator: Iterator[Receiver[_]]) => {
         if (!iterator.hasNext) {
@@ -259,9 +254,10 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
             "Could not start receiver as object not found.")
         }
         val receiver = iterator.next()
-        val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
-        executor.start()
-        executor.awaitTermination()
+        val supervisor = new ReceiverSupervisorImpl(
+          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
+        supervisor.start()
+        supervisor.awaitTermination()
       }
       // Run the dummy Spark job to ensure that all slaves have registered.
       // This avoids all the receivers to be scheduled on the same node.

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
index 92bad7a..0039890 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
@@ -52,4 +52,3 @@ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configura
     HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
new file mode 100644
index 0000000..ad1a6f0
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.io.File
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import akka.actor.{ActorSystem, Props}
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark._
+import org.apache.spark.network.nio.NioBlockTransferService
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.storage._
+import org.apache.spark.streaming.receiver._
+import org.apache.spark.streaming.util._
+import org.apache.spark.util.AkkaUtils
+import WriteAheadLogBasedBlockHandler._
+import WriteAheadLogSuite._
+
+class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
+
+  val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+  val hadoopConf = new Configuration()
+  val storageLevel = StorageLevel.MEMORY_ONLY_SER
+  val streamId = 1
+  val securityMgr = new SecurityManager(conf)
+  val mapOutputTracker = new MapOutputTrackerMaster(conf)
+  val shuffleManager = new HashShuffleManager(conf)
+  val serializer = new KryoSerializer(conf)
+  val manualClock = new ManualClock
+  val blockManagerSize = 10000000
+
+  var actorSystem: ActorSystem = null
+  var blockManagerMaster: BlockManagerMaster = null
+  var blockManager: BlockManager = null
+  var tempDirectory: File = null
+
+  before {
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+      "test", "localhost", 0, conf = conf, securityManager = securityMgr)
+    this.actorSystem = actorSystem
+    conf.set("spark.driver.port", boundPort.toString)
+
+    blockManagerMaster = new BlockManagerMaster(
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+      conf, true)
+
+    blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer,
+      blockManagerSize, conf, mapOutputTracker, shuffleManager,
+      new NioBlockTransferService(conf, securityMgr))
+
+    tempDirectory = Files.createTempDir()
+    manualClock.setTime(0)
+  }
+
+  after {
+    if (blockManager != null) {
+      blockManager.stop()
+      blockManager = null
+    }
+    if (blockManagerMaster != null) {
+      blockManagerMaster.stop()
+      blockManagerMaster = null
+    }
+    actorSystem.shutdown()
+    actorSystem.awaitTermination()
+    actorSystem = null
+
+    if (tempDirectory != null && tempDirectory.exists()) {
+      FileUtils.deleteDirectory(tempDirectory)
+      tempDirectory = null
+    }
+  }
+
+  test("BlockManagerBasedBlockHandler - store blocks") {
+    withBlockManagerBasedBlockHandler { handler =>
+      testBlockStoring(handler) { case (data, blockIds, storeResults) =>
+        // Verify the data in block manager is correct
+        val storedData = blockIds.flatMap { blockId =>
+          blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
+        }.toList
+        storedData shouldEqual data
+
+        // Verify that the store results are instances of BlockManagerBasedStoreResult
+        assert(
+          storeResults.forall { _.isInstanceOf[BlockManagerBasedStoreResult] },
+          "Unexpected store result type"
+        )
+      }
+    }
+  }
+
+  test("BlockManagerBasedBlockHandler - handle errors in storing block") {
+    withBlockManagerBasedBlockHandler { handler =>
+      testErrorHandling(handler)
+    }
+  }
+
+  test("WriteAheadLogBasedBlockHandler - store blocks") {
+    withWriteAheadLogBasedBlockHandler { handler =>
+      testBlockStoring(handler) { case (data, blockIds, storeResults) =>
+        // Verify the data in block manager is correct
+        val storedData = blockIds.flatMap { blockId =>
+          blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
+        }.toList
+        storedData shouldEqual data
+
+        // Verify that the store results are instances of WriteAheadLogBasedStoreResult
+        assert(
+          storeResults.forall { _.isInstanceOf[WriteAheadLogBasedStoreResult] },
+          "Unexpected store result type"
+        )
+        // Verify the data in write ahead log files is correct
+        val fileSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].segment}
+        val loggedData = fileSegments.flatMap { segment =>
+          val reader = new WriteAheadLogRandomReader(segment.path, hadoopConf)
+          val bytes = reader.read(segment)
+          reader.close()
+          blockManager.dataDeserialize(generateBlockId(), bytes).toList
+        }
+        loggedData shouldEqual data
+      }
+    }
+  }
+
+  test("WriteAheadLogBasedBlockHandler - handle errors in storing block") {
+    withWriteAheadLogBasedBlockHandler { handler =>
+      testErrorHandling(handler)
+    }
+  }
+
+  test("WriteAheadLogBasedBlockHandler - cleanup old blocks") {
+    withWriteAheadLogBasedBlockHandler { handler =>
+      val blocks = Seq.tabulate(10) { i => IteratorBlock(Iterator(1 to i)) }
+      storeBlocks(handler, blocks)
+
+      val preCleanupLogFiles = getWriteAheadLogFiles()
+      preCleanupLogFiles.size should be > 1
+
+      // this depends on the number of blocks inserted using generateAndStoreData()
+      manualClock.currentTime() shouldEqual 5000L
+
+      val cleanupThreshTime = 3000L
+      handler.cleanupOldBlock(cleanupThreshTime)
+      eventually(timeout(10000 millis), interval(10 millis)) {
+        getWriteAheadLogFiles().size should be < preCleanupLogFiles.size
+      }
+    }
+  }
+
+  /**
+   * Test storing of data using different forms of ReceivedBlocks and verify that they succeeded
+   * using the given verification function
+   */
+  private def testBlockStoring(receivedBlockHandler: ReceivedBlockHandler)
+      (verifyFunc: (Seq[String], Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) => Unit) {
+    val data = Seq.tabulate(100) { _.toString }
+
+    def storeAndVerify(blocks: Seq[ReceivedBlock]) {
+      blocks should not be empty
+      val (blockIds, storeResults) = storeBlocks(receivedBlockHandler, blocks)
+      withClue(s"Testing with ${blocks.head.getClass.getSimpleName}s:") {
+        // Verify returns store results have correct block ids
+        (storeResults.map { _.blockId }) shouldEqual blockIds
+
+        // Call handler-specific verification function
+        verifyFunc(data, blockIds, storeResults)
+      }
+    }
+
+    def dataToByteBuffer(b: Seq[String]) = blockManager.dataSerialize(generateBlockId, b.iterator)
+
+    val blocks = data.grouped(10).toSeq
+
+    storeAndVerify(blocks.map { b => IteratorBlock(b.toIterator) })
+    storeAndVerify(blocks.map { b => ArrayBufferBlock(new ArrayBuffer ++= b) })
+    storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b)) })
+  }
+
+  /** Test error handling when blocks that cannot be stored */
+  private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler) {
+    // Handle error in iterator (e.g. divide-by-zero error)
+    intercept[Exception] {
+      val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
+      receivedBlockHandler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
+    }
+
+    // Handler error in block manager storing (e.g. too big block)
+    intercept[SparkException] {
+      val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
+      receivedBlockHandler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
+    }
+  }
+
+  /** Instantiate a BlockManagerBasedBlockHandler and run a code with it */
+  private def withBlockManagerBasedBlockHandler(body: BlockManagerBasedBlockHandler => Unit) {
+    body(new BlockManagerBasedBlockHandler(blockManager, storageLevel))
+  }
+
+  /** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */
+  private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) {
+    val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1,
+      storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
+    try {
+      body(receivedBlockHandler)
+    } finally {
+      receivedBlockHandler.stop()
+    }
+  }
+
+  /** Store blocks using a handler */
+  private def storeBlocks(
+      receivedBlockHandler: ReceivedBlockHandler,
+      blocks: Seq[ReceivedBlock]
+    ): (Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) = {
+    val blockIds = Seq.fill(blocks.size)(generateBlockId())
+    val storeResults = blocks.zip(blockIds).map {
+      case (block, id) =>
+        manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf
+        logDebug("Inserting block " + id)
+        receivedBlockHandler.storeBlock(id, block)
+    }.toList
+    logDebug("Done inserting")
+    (blockIds, storeResults)
+  }
+
+  private def getWriteAheadLogFiles(): Seq[String] = {
+    getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId))
+  }
+
+  private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 5eba93c..1956a4f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -58,7 +58,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
   test("WriteAheadLogWriter - writing data") {
     val dataToWrite = generateRandomData()
     val segments = writeDataUsingWriter(testFile, dataToWrite)
-    val writtenData = readDataManually(testFile, segments)
+    val writtenData = readDataManually(segments)
     assert(writtenData === dataToWrite)
   }
 
@@ -67,7 +67,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
     val writer = new WriteAheadLogWriter(testFile, hadoopConf)
     dataToWrite.foreach { data =>
       val segment = writer.write(stringToByteBuffer(data))
-      val dataRead = readDataManually(testFile, Seq(segment)).head
+      val dataRead = readDataManually(Seq(segment)).head
       assert(data === dataRead)
     }
     writer.close()
@@ -281,14 +281,20 @@ object WriteAheadLogSuite {
   }
 
   /** Read data from a segments of a log file directly and return the list of byte buffers.*/
-  def readDataManually(file: String, segments: Seq[WriteAheadLogFileSegment]): Seq[String] = {
-    val reader = HdfsUtils.getInputStream(file, hadoopConf)
-    segments.map { x =>
-      reader.seek(x.offset)
-      val data = new Array[Byte](x.length)
-      reader.readInt()
-      reader.readFully(data)
-      Utils.deserialize[String](data)
+  def readDataManually(segments: Seq[WriteAheadLogFileSegment]): Seq[String] = {
+    segments.map { segment =>
+      val reader = HdfsUtils.getInputStream(segment.path, hadoopConf)
+      try {
+        reader.seek(segment.offset)
+        val bytes = new Array[Byte](segment.length)
+        reader.readInt()
+        reader.readFully(bytes)
+        val data = Utils.deserialize[String](bytes)
+        reader.close()
+        data
+      } finally {
+        reader.close()
+      }
     }
   }
 
@@ -335,9 +341,11 @@ object WriteAheadLogSuite {
     val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
 
     if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
-      fileSystem.listStatus(logDirectoryPath).map {
-        _.getPath.toString.stripPrefix("file:")
-      }.sorted
+      fileSystem.listStatus(logDirectoryPath).map { _.getPath() }.sortBy {
+        _.getName().split("-")(1).toLong
+      }.map {
+        _.toString.stripPrefix("file:")
+      }
     } else {
       Seq.empty
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org