You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/11/19 22:15:56 UTC

spark git commit: [SPARK-4482][Streaming] Disable ReceivedBlockTracker's write ahead log by default

Repository: spark
Updated Branches:
  refs/heads/master eacc78834 -> 22fc4e751


[SPARK-4482][Streaming] Disable ReceivedBlockTracker's write ahead log by default

The write ahead log of ReceivedBlockTracker gets enabled as soon as checkpoint directory is set. This should not happen, as the WAL should be enabled only if the WAL is enabled in the Spark configuration.

Author: Tathagata Das <ta...@gmail.com>

Closes #3358 from tdas/SPARK-4482 and squashes the following commits:

b740136 [Tathagata Das] Fixed bug in ReceivedBlockTracker


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22fc4e75
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22fc4e75
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22fc4e75

Branch: refs/heads/master
Commit: 22fc4e751c0a2f0ff39e42aa0a8fb9459d7412ec
Parents: eacc788
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Nov 19 13:06:48 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Nov 19 13:06:48 2014 -0800

----------------------------------------------------------------------
 .../scheduler/ReceivedBlockTracker.scala        | 37 ++++++++++-----
 .../streaming/ReceivedBlockTrackerSuite.scala   | 50 ++++++++++++++------
 2 files changed, 61 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/22fc4e75/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 5f5e190..02758e0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -70,18 +70,7 @@ private[streaming] class ReceivedBlockTracker(
   
   private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
   private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
-
-  private val logManagerRollingIntervalSecs = conf.getInt(
-    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
-  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
-    new WriteAheadLogManager(
-      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
-      hadoopConf,
-      rollingIntervalSecs = logManagerRollingIntervalSecs,
-      callerName = "ReceivedBlockHandlerMaster",
-      clock = clock
-    )
-  }
+  private val logManagerOption = createLogManager()
 
   private var lastAllocatedBatchTime: Time = null
 
@@ -221,6 +210,30 @@ private[streaming] class ReceivedBlockTracker(
   private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
     streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
   }
+
+  /** Optionally create the write ahead log manager only if the feature is enabled */
+  private def createLogManager(): Option[WriteAheadLogManager] = {
+    if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+      if (checkpointDirOption.isEmpty) {
+        throw new SparkException(
+          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
+            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
+            "See documentation for more details.")
+      }
+      val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
+      val rollingIntervalSecs = conf.getInt(
+        "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
+      val logManager = new WriteAheadLogManager(logDir, hadoopConf,
+        rollingIntervalSecs = rollingIntervalSecs, clock = clock,
+        callerName = "ReceivedBlockHandlerMaster")
+      Some(logManager)
+    } else {
+      None
+    }
+  }
+
+  /** Check if the log manager is enabled. This is only used for testing purposes. */
+  private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty
 }
 
 private[streaming] object ReceivedBlockTracker {

http://git-wip-us.apache.org/repos/asf/spark/blob/22fc4e75/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index fd9c97f..01a09b6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -41,17 +41,16 @@ import org.apache.spark.util.Utils
 class ReceivedBlockTrackerSuite
   extends FunSuite with BeforeAndAfter with Matchers with Logging {
 
-  val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
-  conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
-
   val hadoopConf = new Configuration()
   val akkaTimeout = 10 seconds
   val streamId = 1
 
   var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
   var checkpointDirectory: File = null
+  var conf: SparkConf = null
 
   before {
+    conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
     checkpointDirectory = Files.createTempDir()
   }
 
@@ -64,7 +63,8 @@ class ReceivedBlockTrackerSuite
   }
 
   test("block addition, and block to batch allocation") {
-    val receivedBlockTracker = createTracker(enableCheckpoint = false)
+    val receivedBlockTracker = createTracker(setCheckpointDir = false)
+    receivedBlockTracker.isLogManagerEnabled should be (false)  // should be disable by default
     receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
 
     val blockInfos = generateBlockInfos()
@@ -95,13 +95,11 @@ class ReceivedBlockTrackerSuite
 
   test("block addition, block to batch allocation and cleanup with write ahead log") {
     val manualClock = new ManualClock
-    conf.getInt(
-      "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1) should be (1)
-
     // Set the time increment level to twice the rotation interval so that every increment creates
     // a new log file
-    val timeIncrementMillis = 2000L
+
     def incrementTime() {
+      val timeIncrementMillis = 2000L
       manualClock.addToTime(timeIncrementMillis)
     }
 
@@ -121,7 +119,11 @@ class ReceivedBlockTrackerSuite
     }
 
     // Start tracker and add blocks
-    val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock)
+    conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
+    conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
+    val tracker1 = createTracker(clock = manualClock)
+    tracker1.isLogManagerEnabled should be (true)
+
     val blockInfos1 = addBlockInfos(tracker1)
     tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
 
@@ -132,7 +134,7 @@ class ReceivedBlockTrackerSuite
 
     // Restart tracker and verify recovered list of unallocated blocks
     incrementTime()
-    val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock)
+    val tracker2 = createTracker(clock = manualClock)
     tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
 
     // Allocate blocks to batch and verify whether the unallocated blocks got allocated
@@ -156,7 +158,7 @@ class ReceivedBlockTrackerSuite
 
     // Restart tracker and verify recovered state
     incrementTime()
-    val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock)
+    val tracker3 = createTracker(clock = manualClock)
     tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
     tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
     tracker3.getUnallocatedBlocks(streamId) shouldBe empty
@@ -179,18 +181,38 @@ class ReceivedBlockTrackerSuite
     // Restart tracker and verify recovered state, specifically whether info about the first
     // batch has been removed, but not the second batch
     incrementTime()
-    val tracker4 = createTracker(enableCheckpoint = true, clock = manualClock)
+    val tracker4 = createTracker(clock = manualClock)
     tracker4.getUnallocatedBlocks(streamId) shouldBe empty
     tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty  // should be cleaned
     tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
   }
+  
+  test("enabling write ahead log but not setting checkpoint dir") {
+    conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
+    intercept[SparkException] {
+      createTracker(setCheckpointDir = false)
+    }
+  }
+  
+  test("setting checkpoint dir but not enabling write ahead log") {
+    // When WAL config is not set, log manager should not be enabled
+    val tracker1 = createTracker(setCheckpointDir = true)
+    tracker1.isLogManagerEnabled should be (false)
+
+    // When WAL is explicitly disabled, log manager should not be enabled
+    conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
+    val tracker2 = createTracker(setCheckpointDir = true)
+    tracker2.isLogManagerEnabled should be(false)
+  }
 
   /**
    * Create tracker object with the optional provided clock. Use fake clock if you
    * want to control time by manually incrementing it to test log cleanup.
    */
-  def createTracker(enableCheckpoint: Boolean, clock: Clock = new SystemClock): ReceivedBlockTracker = {
-    val cpDirOption = if (enableCheckpoint) Some(checkpointDirectory.toString) else None
+  def createTracker(
+      setCheckpointDir: Boolean = true,
+      clock: Clock = new SystemClock): ReceivedBlockTracker = {
+    val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
     val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption)
     allReceivedBlockTrackers += tracker
     tracker


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org