You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/04/20 01:15:51 UTC

spark git commit: Revert "[SPARK-14719] WriteAheadLogBasedBlockHandler should ignore BlockManager put errors"

Repository: spark
Updated Branches:
  refs/heads/master ecd877e83 -> a685e65a4


Revert "[SPARK-14719] WriteAheadLogBasedBlockHandler should ignore BlockManager put errors"

This reverts commit ed2de0299a5a54b566b91ae9f47b6626c484c1d3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a685e65a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a685e65a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a685e65a

Branch: refs/heads/master
Commit: a685e65a4ca0b300b12103fccbda29cb08221f5d
Parents: ecd877e
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Apr 19 16:15:06 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Apr 19 16:15:06 2016 -0700

----------------------------------------------------------------------
 .../receiver/ReceivedBlockHandler.scala         | 22 ++++-----
 .../streaming/ReceivedBlockHandlerSuite.scala   | 49 +++++++++-----------
 2 files changed, 31 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a685e65a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index a7d8705..c4bc5cf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.receiver
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.language.{existentials, postfixOps}
-import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -190,19 +189,14 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
 
     // Store the block in block manager
     val storeInBlockManagerFuture = Future {
-      try {
-        val putSucceeded = blockManager.putBytes(
-          blockId,
-          serializedBlock,
-          effectiveStorageLevel,
-          tellMaster = true)
-        if (!putSucceeded) {
-          logWarning(
-            s"Could not store $blockId to block manager with storage level $storageLevel")
-        }
-      } catch {
-        case NonFatal(t) =>
-          logError(s"Could not store $blockId to block manager with storage level $storageLevel", t)
+      val putSucceeded = blockManager.putBytes(
+        blockId,
+        serializedBlock,
+        effectiveStorageLevel,
+        tellMaster = true)
+      if (!putSucceeded) {
+        throw new SparkException(
+          s"Could not store $blockId to block manager with storage level $storageLevel")
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a685e65a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index ea87b0d..4be4882 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -127,17 +127,7 @@ class ReceivedBlockHandlerSuite
 
   test("BlockManagerBasedBlockHandler - handle errors in storing block") {
     withBlockManagerBasedBlockHandler { handler =>
-      // Handle error in iterator (e.g. divide-by-zero error)
-      intercept[Exception] {
-        val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
-        handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
-      }
-
-      // Handler error in block manager storing (e.g. too big block)
-      intercept[SparkException] {
-        val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
-        handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
-      }
+      testErrorHandling(handler)
     }
   }
 
@@ -177,15 +167,7 @@ class ReceivedBlockHandlerSuite
 
   test("WriteAheadLogBasedBlockHandler - handle errors in storing block") {
     withWriteAheadLogBasedBlockHandler { handler =>
-      // Handle error in iterator (e.g. divide-by-zero error)
-      intercept[Exception] {
-        val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
-        handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
-      }
-
-      // Throws no errors when storing blocks that are too large to be cached
-      val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
-      handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
+      testErrorHandling(handler)
     }
   }
 
@@ -222,26 +204,26 @@ class ReceivedBlockHandlerSuite
     sparkConf.set("spark.storage.unrollFraction", "0.4")
     // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
     blockManager = createBlockManager(12000, sparkConf)
-    // This block is way too large to possibly be cached in memory:
-    def hugeBlock: IteratorBlock = IteratorBlock(List.fill(100)(new Array[Byte](1000)).iterator)
 
     // there is not enough space to store this block in MEMORY,
     // But BlockManager will be able to serialize this block to WAL
     // and hence count returns correct value.
-    testRecordcount(false, StorageLevel.MEMORY_ONLY, hugeBlock, blockManager, Some(100))
+     testRecordcount(false, StorageLevel.MEMORY_ONLY,
+      IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
 
     // there is not enough space to store this block in MEMORY,
     // But BlockManager will be able to serialize this block to DISK
     // and hence count returns correct value.
-    testRecordcount(true, StorageLevel.MEMORY_AND_DISK, hugeBlock, blockManager, Some(100))
+    testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+      IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
 
     // there is not enough space to store this block With MEMORY_ONLY StorageLevel.
     // BlockManager will not be able to unroll this block
     // and hence it will not tryToPut this block, resulting the SparkException
     storageLevel = StorageLevel.MEMORY_ONLY
     withBlockManagerBasedBlockHandler { handler =>
-      intercept[SparkException] {
-        storeSingleBlock(handler, hugeBlock)
+      val thrown = intercept[SparkException] {
+        storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator))
       }
     }
   }
@@ -364,6 +346,21 @@ class ReceivedBlockHandlerSuite
     storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) })
   }
 
+  /** Test error handling when blocks that cannot be stored */
+  private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler) {
+    // Handle error in iterator (e.g. divide-by-zero error)
+    intercept[Exception] {
+      val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
+      receivedBlockHandler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
+    }
+
+    // Handler error in block manager storing (e.g. too big block)
+    intercept[SparkException] {
+      val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
+      receivedBlockHandler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
+    }
+  }
+
   /** Instantiate a BlockManagerBasedBlockHandler and run a code with it */
   private def withBlockManagerBasedBlockHandler(body: BlockManagerBasedBlockHandler => Unit) {
     body(new BlockManagerBasedBlockHandler(blockManager, storageLevel))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org