You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2020/12/07 15:48:00 UTC

[jira] [Resolved] (SPARK-33373) A serialized ImputerModel fails to be serialized again

     [ https://issues.apache.org/jira/browse/SPARK-33373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean R. Owen resolved SPARK-33373.
----------------------------------
    Resolution: Not A Problem

> A serialized ImputerModel fails to be serialized again
> ------------------------------------------------------
>
>                 Key: SPARK-33373
>                 URL: https://issues.apache.org/jira/browse/SPARK-33373
>             Project: Spark
>          Issue Type: Bug
>          Components: ML
>    Affects Versions: 2.4.3
>         Environment: * Python 3.7.3
>  * (Py)Spark 2.4.3
>            Reporter: Andre Boechat
>            Priority: Major
>
> After loading an {{ImputerModel}} from disk, the instance fails to save itself again.
> h2. Code Sample
> {code:python}
>     from pyspark.ml.feature import Imputer, ImputerModel
>     df = sparksession.createDataFrame(
>         [
>             (2.0, 3.0),
>             (2.0, 1.0),
>             (2.0, None),
>             (None, 0.0)
>         ],
>         ["x200", "x3"]
>     ).repartition(1)
>     i = Imputer(inputCols=["x200", "x3"], outputCols=["x200_i", "x3_i"]).fit(
>         df
>     )
>     tdf = i.transform(df)
>     fpath = "/tmp/bucketpath"
>     i.write().overwrite().save(fpath)
>     li = ImputerModel.load(fpath)
>     t2df = li.transform(df)
>     assert all(
>         r1.asDict() == r2.asDict() for r1, r2 in zip(
>             tdf.collect(), t2df.collect()
>         )
>     )
>     # This line makes Spark crash.
>     li.write().overwrite().save(fpath)
> {code}
> h2. Stacktrace
> {code:python}
> --> 480     li.write().overwrite().save(fpath)                                                                                                                                                                                                                                                                                                                             
>                                                                                                                                                                                                                                                                                                                                                                            
> /usr/spark-2.4.3/python/pyspark/ml/util.py in save(self, path)                                                                                                                       
>     181         if not isinstance(path, basestring):                                                                                                                                 
>     182             raise TypeError("path should be a basestring, got type %s" % type(path))                                                                                         
> --> 183         self._jwrite.save(path)                                                                                                                                              
>     184                                                                                                                                                                              
>     185     def overwrite(self):                                                                                                                                                                                                                                                                                                                                           
>                                                                                                                                                                                      
> /usr/local/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)                                                                                                 
>    1284         answer = self.gateway_client.send_command(command)                                                                                                                   
>    1285         return_value = get_return_value(                                                                                                                                                                                                                                                                                                                           
> -> 1286             answer, self.gateway_client, self.target_id, self.name)                                                                                                                                                                                                                                                                                                
>    1287                                                                                                                                                                              
>    1288         for temp_arg in temp_args:                                                                                                                                           
>                                                                                                                                                                                      
> /usr/spark-2.4.3/python/pyspark/sql/utils.py in deco(*a, **kw)                                                                                                                       
>      61     def deco(*a, **kw):                                                                                                                                                      
>      62         try:                                                                                                                                                                 
> ---> 63             return f(*a, **kw)                                                                                                                                               
>      64         except py4j.protocol.Py4JJavaError as e:                                                                                                                             
>      65             s = e.java_exception.toString()                                                                                                                                  
>                                                                                                                                                                                      
> /usr/local/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)                                                                 
>     326                 raise Py4JJavaError(                                                                                                                                         
>     327                     "An error occurred while calling {0}{1}{2}.\n".                                                                                                          
> --> 328                     format(target_id, ".", name), value)                                                                                                                                                                                                                                                                                                           
>     329             else:                                                                                                                                                            
>     330                 raise Py4JError(                                                                                                                                             
>                                                                                                                                                                                      
> Py4JJavaError: An error occurred while calling o572.save.
>         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)                                                                            
>         at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)                                             
>         at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)                                                                                                                                                                                                                                                   
>         at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)                                                                                                                                                                                                                                                              
>         at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)                                                                               
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)                                                                                    
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)                                                                                    
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)                                                                               
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)                                                                                            
>         at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)                                                                                                                                                                                                                                                                                      
>         at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)                                                                                                     
>         at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)                                                                                   
>         at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)                                                                                              
>         at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)                                                                                                                                                                                                                                                                     
>         at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)                                                                                                                                                                                                                                                                     
>         at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)                                                                    
>         at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)                                                                                
>         at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)                                                                                    
>         at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)                                                                                                
>         at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)                                                                                            
>         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)                                                                                                      
>         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)                                                                                                      
>         at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)                                                                                                   
>         at org.apache.spark.ml.feature.ImputerModel$ImputerModelWriter.saveImpl(Imputer.scala:252)                                                                                   
>         at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:180)                                                                                                               
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                                                                                               
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)                                                                                             
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)                                                                                     
>         at java.lang.reflect.Method.invoke(Method.java:498)                                                                                                                                                                                                                                                                                                                
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)                                                                                                              
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)                                                                                                        
>         at py4j.Gateway.invoke(Gateway.java:282)                                                                                                                                     
>         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)                                                                                                      
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)                                                                                                                    
>         at py4j.GatewayConnection.run(GatewayConnection.java:238)                                                                                                                    
>         at java.lang.Thread.run(Thread.java:748)                                                                                                                                     
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in stage 63.0 (TID 192, localhost, executor driver): java.io.FileNotFoundException: File file:/tmp/bucketpath/data/part-00000-0e35e712-a3f9-451a-856b-15dc690569ad-c000.snappy.parquet does not exist                
> It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.                                                                                                                                                               
>         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)       
>         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)                                                                        
>         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)                                                                             
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)                                              
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)                                                    
>         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)                                                                                                                                                                                                                                                                         
>         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)                                                         
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)                                                                                                            
>         at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)                                                                   
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)                                                                                                                                                                                                                                                                                      
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)                                                                                                                                                                                                                                                                                      
>         at org.apache.spark.scheduler.Task.run(Task.scala:121)                                                                                                                       
>         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)                                                                                       
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)                                                                                                         
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)                                                                                                     
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)                                                                                           
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)                                                                                           
>         at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:                                                                                                                                                                   
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)                                     
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)                                                                              
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)                                                                              
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)                                                                                            
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)                                                                                                                                                                                                                                                                                              
>         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)                                                                                               
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)                                                                      
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)                                                                      
>         at scala.Option.foreach(Option.scala:257)                                                                                                                                    
>         at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)                                                                                       
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)                                                                              
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)                                                                                
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)                                                                                                                                                                                                                                                                      
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)                                                                                                                                                                                                                                                                                                 
>         at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)                                                                                                    
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)                                                                                                             
>         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)                                                                            
>         ... 35 more                                                                                                                                                                  
> Caused by: java.io.FileNotFoundException: File file:/tmp/bucketpath/data/part-00000-0e35e712-a3f9-451a-856b-15dc690569ad-c000.snappy.parquet does not exist                          
> It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.                                                                                                                                                               
>         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)       
>         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)                                                                        
>         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)                                                                             
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)                                                                                                                                                                                                                                    
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)                                                                                                                                                                                                                                          
>         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)                                                                                   
>         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)                                                         
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)                                                                                                            
>         at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)                                                                   
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)                                                                                                
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)                                                                                                
>         at org.apache.spark.scheduler.Task.run(Task.scala:121)                                                                                                                       
>         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)                                                                                       
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)                                                                                                         
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)                                                                                                     
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)                                                                                           
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)                                                                                           
>         ... 1 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org