You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/12/18 09:08:18 UTC
spark git commit: [SPARK-18827][CORE] Fix cannot read broadcast on
disk
Repository: spark
Updated Branches:
refs/heads/master c0c9e1d27 -> 1e5c51f33
[SPARK-18827][CORE] Fix cannot read broadcast on disk
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since https://github.com/apache/spark/pull/15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.
This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it more robust.
We can cache and read broadcast even it cannot fit in memory from this pull request.
Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
at scala.Option.map(Option.scala:146)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
at scala.Option.map(Option.scala:146)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
... 12 more
```
## How was this patch tested?
Add unit test
Author: Yuming Wang <wg...@gmail.com>
Closes #16252 from wangyum/SPARK-18827.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e5c51f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e5c51f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e5c51f3
Branch: refs/heads/master
Commit: 1e5c51f336b90cd1eed43e9c6cf00faee696174c
Parents: c0c9e1d
Author: Yuming Wang <wg...@gmail.com>
Authored: Sun Dec 18 09:08:02 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Dec 18 09:08:02 2016 +0000
----------------------------------------------------------------------
.../org/apache/spark/broadcast/TorrentBroadcast.scala | 14 +++++++++-----
.../org/apache/spark/storage/memory/MemoryStore.scala | 2 +-
.../org/apache/spark/broadcast/BroadcastSuite.scala | 12 ++++++++++++
3 files changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1e5c51f3/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index f350784..22d01c4 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -207,11 +207,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
- blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
- case Some(x) =>
- releaseLock(broadcastId)
- x.asInstanceOf[T]
-
+ blockManager.getLocalValues(broadcastId) match {
+ case Some(blockResult) =>
+ if (blockResult.data.hasNext) {
+ val x = blockResult.data.next().asInstanceOf[T]
+ releaseLock(broadcastId)
+ x
+ } else {
+ throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
+ }
case None =>
logInfo("Started reading broadcast variable " + id)
val startTimeMs = System.currentTimeMillis()
http://git-wip-us.apache.org/repos/asf/spark/blob/1e5c51f3/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index c08275c..fb54dd6 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -702,7 +702,7 @@ private[storage] class PartiallyUnrolledIterator[T](
}
override def next(): T = {
- if (unrolled == null) {
+ if (unrolled == null || !unrolled.hasNext) {
rest.next()
} else {
unrolled.next()
http://git-wip-us.apache.org/repos/asf/spark/blob/1e5c51f3/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 9736763..6646068 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -137,6 +137,18 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}
+ test("Cache broadcast to disk") {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test")
+ .set("spark.memory.useLegacyMode", "true")
+ .set("spark.storage.memoryFraction", "0.0")
+ sc = new SparkContext(conf)
+ val list = List[Int](1, 2, 3, 4)
+ val broadcast = sc.broadcast(list)
+ assert(broadcast.value.sum === 10)
+ }
+
/**
* Verify the persistence of state associated with a TorrentBroadcast in a local-cluster.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org