You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhao yufei (JIRA)" <ji...@apache.org> on 2018/11/08 03:43:00 UTC

[jira] [Resolved] (SPARK-25969) pyspark deal with large data memory issues

     [ https://issues.apache.org/jira/browse/SPARK-25969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zhao yufei resolved SPARK-25969.
--------------------------------
    Resolution: Resolved

> pyspark deal with large data memory issues
> ------------------------------------------
>
>                 Key: SPARK-25969
>                 URL: https://issues.apache.org/jira/browse/SPARK-25969
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.2
>            Reporter: zhao yufei
>            Priority: Major
>
> now i use pyspark to load a large csv file with line number about 1.4 million, each line contains two filed: imageId, kws (image keywords seperate by ',')
>  
> when i run the following code, it appears outOfMemory:
> {code}
> df_imageIdsKws = spark.read.format('com.databricks.spark.csv').options(delimiter="\t", header='true').schema(schema=schema).load(imagesKwsFilePath)
> numClass=1868
>     def mapRow(row):
>         imageId=row.imageId
>         hotVector = np.zeros((numClass,), dtype=float)
>         
>         for kw in row.kws.split(','):
>              kwIndex=kwsIndexMap_broadcast.value.get(kw)
>              hotVector[int(kwIndex)]=1.0
>              return (imageId,hotVector.tolist())
>    df_imageIdsKws=df_imageIdsKws.rdd.persist(storageLevel=StorageLevel.DISK_ONLY)
> imageIdsKws_rdd_=df_imageIdsKws.map(lambda row:mapRow(row)).persist(storageLevel=StorageLevel.DISK_ONLY)
> {code}
> even i use DISK_ONLY for all rdds, still outOfMemory,  
> but when i change the numClass=1 for test , all work well.
> following error messages from executor log:
> {code:java}
> java.lang.OutOfMemoryError: Java heap space
>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>         at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:431)
>         at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:431)
>         at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
>         at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
>         at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:351)
>         at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
>         at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381)
>         at org.apache.spark.util.Utils$.copyStream(Utils.scala:357)
>         at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:436)
>         at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>         at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>         at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:223)
>         at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:439)
>         at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:247)
>         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1992)
>         at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
> 2018-11-08 10:53:06 ERROR SparkUncaughtExceptionHandler:91 - Uncaught exception in thread Thread[stdout writer for /data/anaconda3/bin/python3.5,5,main]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org