You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/12/18 09:08:18 UTC

spark git commit: [SPARK-18827][CORE] Fix cannot read broadcast on disk

Repository: spark
Updated Branches:
  refs/heads/master c0c9e1d27 -> 1e5c51f33


[SPARK-18827][CORE] Fix cannot read broadcast on disk

## What changes were proposed in this pull request?
`NoSuchElementException` will throw since https://github.com/apache/spark/pull/15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.

This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more
```

## How was this patch tested?

Add unit test

Author: Yuming Wang <wg...@gmail.com>

Closes #16252 from wangyum/SPARK-18827.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e5c51f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e5c51f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e5c51f3

Branch: refs/heads/master
Commit: 1e5c51f336b90cd1eed43e9c6cf00faee696174c
Parents: c0c9e1d
Author: Yuming Wang <wg...@gmail.com>
Authored: Sun Dec 18 09:08:02 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Dec 18 09:08:02 2016 +0000

----------------------------------------------------------------------
 .../org/apache/spark/broadcast/TorrentBroadcast.scala | 14 +++++++++-----
 .../org/apache/spark/storage/memory/MemoryStore.scala |  2 +-
 .../org/apache/spark/broadcast/BroadcastSuite.scala   | 12 ++++++++++++
 3 files changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e5c51f3/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index f350784..22d01c4 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -207,11 +207,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
     TorrentBroadcast.synchronized {
       setConf(SparkEnv.get.conf)
       val blockManager = SparkEnv.get.blockManager
-      blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
-        case Some(x) =>
-          releaseLock(broadcastId)
-          x.asInstanceOf[T]
-
+      blockManager.getLocalValues(broadcastId) match {
+        case Some(blockResult) =>
+          if (blockResult.data.hasNext) {
+            val x = blockResult.data.next().asInstanceOf[T]
+            releaseLock(broadcastId)
+            x
+          } else {
+            throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
+          }
         case None =>
           logInfo("Started reading broadcast variable " + id)
           val startTimeMs = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/spark/blob/1e5c51f3/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index c08275c..fb54dd6 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -702,7 +702,7 @@ private[storage] class PartiallyUnrolledIterator[T](
   }
 
   override def next(): T = {
-    if (unrolled == null) {
+    if (unrolled == null || !unrolled.hasNext) {
       rest.next()
     } else {
       unrolled.next()

http://git-wip-us.apache.org/repos/asf/spark/blob/1e5c51f3/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 9736763..6646068 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -137,6 +137,18 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
     sc.stop()
   }
 
+  test("Cache broadcast to disk") {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
+      .set("spark.memory.useLegacyMode", "true")
+      .set("spark.storage.memoryFraction", "0.0")
+    sc = new SparkContext(conf)
+    val list = List[Int](1, 2, 3, 4)
+    val broadcast = sc.broadcast(list)
+    assert(broadcast.value.sum === 10)
+  }
+
   /**
    * Verify the persistence of state associated with a TorrentBroadcast in a local-cluster.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org