You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 03:59:38 UTC

[jira] [Updated] (SPARK-19243) Error when selecting from DataFrame containing parsed data from files larger than 1MB

     [ https://issues.apache.org/jira/browse/SPARK-19243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-19243:
---------------------------------
    Labels: bulk-closed  (was: )

> Error when selecting from DataFrame containing parsed data from files larger than 1MB
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-19243
>                 URL: https://issues.apache.org/jira/browse/SPARK-19243
>             Project: Spark
>          Issue Type: Bug
>            Reporter: Ben
>            Priority: Major
>              Labels: bulk-closed
>
> I hope I can describe the problem clearly. This error happens with Spark 2.0.1. However, I tried with Spark 2.1.0 on my test PC and it worked there, none of the issues below, but I can't try it on the test cluster because Spark needs to be upgraded there. I'm opening this ticket because if it's a bug, maybe something is still partially present in Spark 2.1.0.
> Initially I though it was my script's problem so I tried to debug, until I found why this is happening.
> Step by step, I load XML files through spark-xml into a DataFrame. In my case, the rowTag is the root tag, so each XML file creates a row. The XML structure is fairly complex, which are converted to nested columns or arrays inside the DF. Since I need to flatten the whole table, and since the output is not fixed but I dynamically select what I want as output, in case I need to output columns that have been parsed as arrays, then I explode them with explode() only when needed.
> Normally I can select various columns that don't have many entries without a problem. 
> I select a column that has a lot of entries into a new DF, e.g. simply through
> {noformat}
> df2 = df.select(...)
> {noformat}
> and then if I try to do a count() or first() or anything, Spark behaves two ways:
> 1. If the source file was smaller than 1MB, it works.
> 2. If the source file was larger than 1MB, the following error occurs:
> {noformat}
> Traceback (most recent call last):
>   File \"/myCode.py\", line 71, in main
>     df.count()
>   File \"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/dataframe.py\", line 299, in count
>     return int(self._jdf.count())
>   File \"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\", line 1133, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File \"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 63, in deco
>     return f(*a, **kw)
>   File \"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\", line 319, in get_return_value
>     format(target_id, \".\", name), value)
> Py4JJavaError: An error occurred while calling o180.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, compname): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>   at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
>   at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
>   at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
>   at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
>   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>   at scala.Option.foreach(Option.scala:257)
>   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
>   at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
>   at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2526)
>   at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
>   at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
>   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>   at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
>   at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2523)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at py4j.Gateway.invoke(Gateway.java:280)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at py4j.GatewayConnection.run(GatewayConnection.java:214)
>   at java.lang.Thread.run(Thread.java:745)
>   
> Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>   at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
>   at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
>   at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
>   at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> {noformat}
> It doesn't happen only when calling count(), but anything.
> As mentioned, this only happens when there are many entries (This only happens in my case in only one configuration, when the column with the most entries is selected, a big difference from the others, and these entries were in an array before, and have been exploded), otherwise it works fine independently of the file size.
> The same thing happens if the source is an archive containing the XML files. If the archive itself is larger than 1MB, error; smaller, works.
> The Spark log shows that the error is when calling e.g. count(), although in my script's log I get this error after starting the select(...) command, but maybe that's because of the parallel processing. Consequently, I'm not sure whether the error happens during the explode(), select(), or for some reason after the DF has been prepared and I call e.g. count().
> I have allocated more than enough memory. On another system I got a `Java heap space` error, I guess on the way to getting the actual error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org