You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Qiang Wang (Jira)" <ji...@apache.org> on 2019/08/30 07:51:00 UTC
[jira] [Created] (SPARK-28926) CLONE -
ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for
datasets with 12 billion instances
Qiang Wang created SPARK-28926:
----------------------------------
Summary: CLONE - ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances
Key: SPARK-28926
URL: https://issues.apache.org/jira/browse/SPARK-28926
Project: Spark
Issue Type: Bug
Components: MLlib
Affects Versions: 2.2.1
Reporter: Qiang Wang
Assignee: Xiangrui Meng
The stack trace is below:
{quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 BlockManager: Block rdd_10916_493 could not be removed as it was not found on disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) java.lang.ArrayIndexOutOfBoundsException: 6741 at org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) at org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
{quote}
This exception happened sometimes. And we also found that the AUC metric was not stable when evaluating the inner product of the user factors and the item factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 which was not stable for production environment.
Dataset capacity: ~12 billion ratings
Here is the our code:
{code:java}
val hivedata = sc.sql(sqltext).select(id,dpid,score).coalesce(numPartitions)
val predataItem = hivedata.rdd.map(r=>(r._1._1,(r._1._2,r._2.sum)))
.groupByKey().zipWithIndex()
.persist(StorageLevel.MEMORY_AND_DISK_SER)
val predataUser = predataItem.flatMap(r=>r._1._2.map(y=>(y._1,(r._2.toInt,y._2))))
.aggregateByKey(zeroValueArr,numPartitions)((a,b)=> a += b,(a,b)=>a ++ b).map(r=>(r._1,r._2.toIterable))
.zipWithIndex().persist(StorageLevel.MEMORY_AND_DISK_SER)
//x._2 is the item_id, y._1 is the user_id, y._2 is the rating
val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, y._2.toFloat)))
.setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)
case class ALSData(user:Int, item:Int, rating:Float) extends Serializable
val ratingData = trainData.map(x => ALSData(x._1, x._2, x._3)).toDF()
val als = new ALS
val paramMap = ParamMap(als.alpha -> 25000).
put(als.checkpointInterval, 5).
put(als.implicitPrefs, true).
put(als.itemCol, "item").
put(als.maxIter, 60).
put(als.nonnegative, false).
put(als.numItemBlocks, 600).
put(als.numUserBlocks, 600).
put(als.regParam, 4.5).
put(als.rank, 25).
put(als.userCol, "user")
als.fit(ratingData, paramMap)
{code}
--
This message was sent by Atlassian Jira
(v8.3.2#803003)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org