You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tan Tim <un...@gmail.com> on 2014/09/18 09:41:39 UTC

some trouble with repartition

Hi, all

Due to  the performance, I use the repartition in the spark[see this
<http://apache-spark-user-list.1001560.n3.nabble.com/why-a-machine-learning-application-run-slowly-on-the-spark-cluster-td10910.html>].
Sometimes this job can run successfully. But today, I meet some troubles:

*[stages from the web UI]:*


*[error log]*
java.io.FileNotFoundException (java.io.FileNotFoundException:
/tmp/spark-local-20140918122216-335f/2f/shuffle_0_78_17 (No such file or
directory))
java.io.RandomAccessFile.open(Native Method)
java.io.RandomAccessFile.<init>(Unknown Source)
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:87)
org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:105)
org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:265)
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:205)
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:204)
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:235)
org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:452)
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:77)
org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:61)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:90)
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:89)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:75)
org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
org.apache.spark.scheduler.Task.run(Task.scala:53)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Unknown Source)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)

*[the spark conf setting]*
val conf = new SparkConf().setMaster(sparkMaster).setAppName("ModelTraining"
).setSparkHome(sparkHome).setJars(List(jarFile))
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "LRRegistrator")
conf.set("spark.storage.memoryFraction", "0.7")
conf.set("spark.executor.memory", "8g")
conf.set("spark.cores.max", "150")
conf.set("spark.speculation", "true")
conf.set("spark.storage.blockManagerHeartBeatMs", "300000")

val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://xxx:52310"+inputPath , 3)
val trainset = lines.map(parseWeightedPoint).repartition(50
).persist(StorageLevel.MEMORY_ONLY)

I don't know why this error appears, any suggestions? Thanks

tim tan