You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Kamprath (Jira)" <ji...@apache.org> on 2021/03/05 18:14:00 UTC
[jira] [Commented] (SPARK-34563) Checkpointing a union with another
checkpoint fails
[ https://issues.apache.org/jira/browse/SPARK-34563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17296250#comment-17296250 ]
Michael Kamprath commented on SPARK-34563:
------------------------------------------
I just tested this under Spark 3.1.1 keep everything else in my set up the same, and it fails at the same point. However, the exception thrown looks slightly different:
{code:java}
Py4JJavaError Traceback (most recent call last)
<ipython-input-2-ea419227c865> in <module>
8 print('Processing i = {0}'.format(i))
9 new_df = spark.range(RANGE_STEP*i + 1, RANGE_STEP*(i+1) + 1, numPartitions=PARTITIONS)
---> 10 df = df.union(new_df).checkpoint()
11
12 df.count()
/usr/spark-3.1.1/python/pyspark/sql/dataframe.py in checkpoint(self, eager)
544 This API is experimental.
545 """
--> 546 jdf = self._jdf.checkpoint(eager)
547 return DataFrame(jdf, self.sql_ctx)
548
/usr/spark-3.1.1/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/spark-3.1.1/python/pyspark/sql/utils.py in deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
/usr/spark-3.1.1/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o65.checkpoint.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 2.0 failed 4 times, most recent failure: Lost task 8.3 in stage 2.0 (TID 50) (10.20.30.17 executor 3): java.lang.IndexOutOfBoundsException: Index: 61, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:659)
at java.util.ArrayList.get(ArrayList.java:435)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:857)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:811)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1866)
at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1253)
at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1253)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:697)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:688)
at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:651)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: Index: 61, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:659)
at java.util.ArrayList.get(ArrayList.java:435)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:857)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:811)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1866)
at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1253)
at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1253)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
{code}
> Checkpointing a union with another checkpoint fails
> ---------------------------------------------------
>
> Key: SPARK-34563
> URL: https://issues.apache.org/jira/browse/SPARK-34563
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.0.2
> Environment: I am running Spark 3.0.2 in stand alone cluster mode, built for Hadoop 2.7, and Scala 2.12.12. I am using QFS 2.2.2 (Quantcast File System) as the underlying DFS. The nodes run on Debian Stretch, and Java is openjdk version "1.8.0_275".
> Reporter: Michael Kamprath
> Priority: Major
>
> I have some PySpark code that periodically checkpoints a data frame that I am building in pieces by union-ing those pieces together as they are constructed. (Py)Spark fails on the second checkpoint, which would be a union of a new piece of the desired data frame with a previously checkpointed piece. Some simplified PySpark code that will trigger this problem is:
>
> {code:java}
> RANGE_STEP = 10000
> PARTITIONS = 5
> COUNT_UNIONS = 20
> df = spark.range(1, RANGE_STEP+1, numPartitions=PARTITIONS)
> for i in range(1, COUNT_UNIONS+1):
> print('Processing i = {0}'.format(i))
> new_df = spark.range(RANGE_STEP*i + 1, RANGE_STEP*(i+1) + 1, numPartitions=PARTITIONS)
> df = df.union(new_df).checkpoint()
> df.count()
> {code}
> When this code gets to the checkpoint on the second loop iteration (i=2) the job fails with an error:
>
> {code:java}
> Py4JJavaError: An error occurred while calling o119.checkpoint.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 10.0 failed 4 times, most recent failure: Lost task 9.3 in stage 10.0 (TID 264, 10.20.30.13, executor 0): com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 9062
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:804)
> at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
> at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1804)
> at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1227)
> at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1227)
> at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:127)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
> at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
> at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
> at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
> at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
> at scala.Option.foreach(Option.scala:407)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)
> at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
> at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:696)
> at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
> at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
> at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
> at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
> at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:687)
> at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:650)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 9062
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:804)
> at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
> at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1804)
> at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1227)
> at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1227)
> at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:127)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> {code}
>
> Note that the checkpoint directory is set, as the first checkpoint does succeed. Also, if the checkpoint method is removed, the sample code succeeds as expected, so the problems isolated to the use of the checkpoint.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org