You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shubham Chopra (JIRA)" <ji...@apache.org> on 2016/07/14 22:06:20 UTC
[jira] [Comment Edited] (SPARK-16550) Caching data with replication
doesn't replicate data
[ https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15378478#comment-15378478 ]
Shubham Chopra edited comment on SPARK-16550 at 7/14/16 10:05 PM:
------------------------------------------------------------------
Example code:
{quote}
case class TestInteger(i: Int)
object TestApp {
def main(args: Array[String]) {
val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client")
val sc = new SparkContext(conf)
val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
println(data.count)
println(s"Total number of blocks in data: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}")
val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataTestInt.count)
println(s"Total number of blocks in dataTestInt: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}")
val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataInteger.count)
println(s"Total number of blocks in dataInteger: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}")
val dataSerialized = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataSerialized.count)
println(s"Total number of blocks in dataSerialized: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}")
val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataTestIntSer.count)
println(s"Total number of blocks in dataTestIntSer: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}")
}
}
{quote}
Output:
{quote}
1000
Total number of blocks in data: 10
1000
Total number of blocks in dataTestInt: 10
1000
Total number of blocks in dataInteger: 20
1000
Total number of blocks in dataSerialized: 20
1000
Total number of blocks in dataTestIntSer: 10
{quote}
The issue exists when I submit a compiled program as well. The exception stack traces are similar to the ones posted above.
I think part of the problem might be related to https://issues.apache.org/jira/browse/SPARK-13990
This code works fine in Spark 1.6.2, both in the shell and when submitted as compiled code.
was (Author: shubhamc):
Example code:
```
case class TestInteger(i: Int)
object TestApp {
def main(args: Array[String]) {
val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client")
val sc = new SparkContext(conf)
val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
println(data.count)
println(s"Total number of blocks in data: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}")
val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataTestInt.count)
println(s"Total number of blocks in dataTestInt: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}")
val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataInteger.count)
println(s"Total number of blocks in dataInteger: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}")
val dataSerialized = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataSerialized.count)
println(s"Total number of blocks in dataSerialized: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}")
val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataTestIntSer.count)
println(s"Total number of blocks in dataTestIntSer: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}")
}
}
```
Output:
1000
Total number of blocks in data: 10
1000
Total number of blocks in dataTestInt: 10
1000
Total number of blocks in dataInteger: 20
1000
Total number of blocks in dataSerialized: 20
1000
Total number of blocks in dataTestIntSer: 10
The issue exists when I submit a compiled program as well. The exception stack traces are similar to the ones posted above.
I think part of the problem might be related to https://issues.apache.org/jira/browse/SPARK-13990
This code works fine in Spark 1.6.2, both in the shell and when submitted as compiled code.
> Caching data with replication doesn't replicate data
> ----------------------------------------------------
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
> Issue Type: Bug
> Components: Block Manager, Spark Core
> Affects Versions: 2.0.0
> Reporter: Shubham Chopra
>
> Caching multiple replicas of blocks is currently broken. The following examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(MEMORY_ONLY_2)
> data.count
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> val data1 = sc.parallelize(1 to 1000, 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
> at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
> at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> works as intended.
> But
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(MEMORY_ONLY_SER_2)
> data4.count
> Again doesn't replicate data and executors show the same ClassNotFoundException
> These examples worked fine and showed expected results with Spark 1.6.2
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org