You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Kamprath (Jira)" <ji...@apache.org> on 2021/03/05 18:14:00 UTC

[jira] [Commented] (SPARK-34563) Checkpointing a union with another checkpoint fails

    [ https://issues.apache.org/jira/browse/SPARK-34563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17296250#comment-17296250 ] 

Michael Kamprath commented on SPARK-34563:
------------------------------------------

I just tested this under Spark 3.1.1 keep everything else in my set up the same, and it fails at the same point. However, the exception thrown looks slightly different:

 
{code:java}
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-ea419227c865> in <module>
      8     print('Processing i = {0}'.format(i))
      9     new_df = spark.range(RANGE_STEP*i + 1, RANGE_STEP*(i+1) + 1, numPartitions=PARTITIONS)
---> 10     df = df.union(new_df).checkpoint()
     11 
     12 df.count()

/usr/spark-3.1.1/python/pyspark/sql/dataframe.py in checkpoint(self, eager)
    544         This API is experimental.
    545         """
--> 546         jdf = self._jdf.checkpoint(eager)
    547         return DataFrame(jdf, self.sql_ctx)
    548 

/usr/spark-3.1.1/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/spark-3.1.1/python/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

/usr/spark-3.1.1/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o65.checkpoint.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 2.0 failed 4 times, most recent failure: Lost task 8.3 in stage 2.0 (TID 50) (10.20.30.17 executor 3): java.lang.IndexOutOfBoundsException: Index: 61, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:659)
	at java.util.ArrayList.get(ArrayList.java:435)
	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:857)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:811)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1866)
	at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1253)
	at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1253)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
	at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:688)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:651)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: Index: 61, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:659)
	at java.util.ArrayList.get(ArrayList.java:435)
	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:857)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:811)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1866)
	at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1253)
	at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1253)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
{code}

> Checkpointing a union with another checkpoint fails
> ---------------------------------------------------
>
>                 Key: SPARK-34563
>                 URL: https://issues.apache.org/jira/browse/SPARK-34563
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.2
>         Environment: I am running Spark 3.0.2 in stand alone cluster mode, built for Hadoop 2.7, and Scala 2.12.12. I am using QFS 2.2.2 (Quantcast File System) as the underlying DFS. The nodes run on Debian Stretch, and Java is openjdk version "1.8.0_275". 
>            Reporter: Michael Kamprath
>            Priority: Major
>
> I have some PySpark code that periodically checkpoints a data frame  that I am building in pieces by union-ing those pieces together as they are constructed. (Py)Spark fails on the second checkpoint, which would be a union of a new piece of the desired data frame with a previously checkpointed piece. Some simplified PySpark code that will trigger this problem is:
>  
> {code:java}
> RANGE_STEP = 10000
> PARTITIONS = 5
> COUNT_UNIONS = 20
> df = spark.range(1, RANGE_STEP+1, numPartitions=PARTITIONS)
> for i in range(1, COUNT_UNIONS+1):
>     print('Processing i = {0}'.format(i))
>     new_df = spark.range(RANGE_STEP*i + 1, RANGE_STEP*(i+1) + 1, numPartitions=PARTITIONS)
>     df = df.union(new_df).checkpoint()
> df.count()
> {code}
> When this code gets to the checkpoint on the second loop iteration (i=2) the job fails with an error:
>  
> {code:java}
> Py4JJavaError: An error occurred while calling o119.checkpoint.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 10.0 failed 4 times, most recent failure: Lost task 9.3 in stage 10.0 (TID 264, 10.20.30.13, executor 0): com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 9062
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:804)
> 	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
> 	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
> 	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> 	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1804)
> 	at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1227)
> 	at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1227)
> 	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:127)
> 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
> 	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
> 	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
> 	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
> 	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> 	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
> 	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
> 	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
> 	at scala.Option.foreach(Option.scala:407)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)
> 	at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
> 	at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:696)
> 	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
> 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
> 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
> 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> 	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
> 	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:687)
> 	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:650)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> 	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> 	at py4j.Gateway.invoke(Gateway.java:282)
> 	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> 	at py4j.commands.CallCommand.execute(CallCommand.java:79)
> 	at py4j.GatewayConnection.run(GatewayConnection.java:238)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 9062
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:804)
> 	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
> 	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
> 	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> 	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1804)
> 	at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1227)
> 	at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1227)
> 	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:127)
> 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	... 1 more
> {code}
>  
> Note that the checkpoint directory is set, as the first checkpoint does succeed.  Also, if the checkpoint method is removed, the sample code succeeds as expected, so the problems isolated to the use of the checkpoint.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org