You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/02/26 16:39:52 UTC

spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

Repository: spark
Updated Branches:
  refs/heads/master 3ca9a2c56 -> b308182f2


[SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

## What changes were proposed in this pull request?

There is a race condition introduced in SPARK-11141 which could cause data loss.
The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch and clears the queue.

In this PR only the allocated blocks will be removed from the queue which will prevent data loss.

## How was this patch tested?

Additional unit test + manually.

Author: Gabor Somogyi <ga...@gmail.com>

Closes #20620 from gaborgsomogyi/SPARK-23438.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b308182f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b308182f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b308182f

Branch: refs/heads/master
Commit: b308182f233b8840dfe0e6b5736d2f2746f40757
Parents: 3ca9a2c
Author: Gabor Somogyi <ga...@gmail.com>
Authored: Mon Feb 26 08:39:44 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Feb 26 08:39:44 2018 -0800

----------------------------------------------------------------------
 .../scheduler/ReceivedBlockTracker.scala        | 11 ++++++----
 .../streaming/ReceivedBlockTrackerSuite.scala   | 23 +++++++++++++++++++-
 2 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b308182f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 5d9a8ac..dacff69 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
       getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
     }
 
-    // Insert the recovered block-to-batch allocations and clear the queue of received blocks
-    // (when the blocks were originally allocated to the batch, the queue must have been cleared).
+    // Insert the recovered block-to-batch allocations and removes them from queue of
+    // received blocks.
     def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) {
       logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
         s"${allocatedBlocks.streamIdToAllocatedBlocks}")
-      streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
+      allocatedBlocks.streamIdToAllocatedBlocks.foreach {
+        case (streamId, allocatedBlocksInStream) =>
+          getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet)
+      }
       timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
       lastAllocatedBatchTime = batchTime
     }
@@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker(
   }
 
   /** Write an update to the tracker to the write ahead log */
-  private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
+  private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
     if (isWriteAheadLogEnabled) {
       logTrace(s"Writing record: $record")
       try {

http://git-wip-us.apache.org/repos/asf/spark/blob/b308182f/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 107c3f5..4fa236b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
-import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _}
 import org.apache.spark.streaming.util._
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite
     receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
+  test("recovery with write ahead logs should remove only allocated blocks from received queue") {
+    val manualClock = new ManualClock
+    val batchTime = manualClock.getTimeMillis()
+
+    val tracker1 = createTracker(clock = manualClock)
+    tracker1.isWriteAheadLogEnabled should be (true)
+
+    val allocatedBlockInfos = generateBlockInfos()
+    val unallocatedBlockInfos = generateBlockInfos()
+    val receivedBlockInfos = allocatedBlockInfos ++ unallocatedBlockInfos
+    receivedBlockInfos.foreach { b => tracker1.writeToLog(BlockAdditionEvent(b)) }
+    val allocatedBlocks = AllocatedBlocks(Map(streamId -> allocatedBlockInfos))
+    tracker1.writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
+    tracker1.stop()
+
+    val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
+    tracker2.getBlocksOfBatch(batchTime) shouldEqual allocatedBlocks.streamIdToAllocatedBlocks
+    tracker2.getUnallocatedBlocks(streamId) shouldEqual unallocatedBlockInfos
+    tracker2.stop()
+  }
+
   test("recovery and cleanup with write ahead logs") {
     val manualClock = new ManualClock
     // Set the time increment level to twice the rotation interval so that every increment creates


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org