You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2020/12/07 15:48:00 UTC
[jira] [Resolved] (SPARK-33373) A serialized ImputerModel fails to
be serialized again
[ https://issues.apache.org/jira/browse/SPARK-33373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean R. Owen resolved SPARK-33373.
----------------------------------
Resolution: Not A Problem
> A serialized ImputerModel fails to be serialized again
> ------------------------------------------------------
>
> Key: SPARK-33373
> URL: https://issues.apache.org/jira/browse/SPARK-33373
> Project: Spark
> Issue Type: Bug
> Components: ML
> Affects Versions: 2.4.3
> Environment: * Python 3.7.3
> * (Py)Spark 2.4.3
> Reporter: Andre Boechat
> Priority: Major
>
> After loading an {{ImputerModel}} from disk, the instance fails to save itself again.
> h2. Code Sample
> {code:python}
> from pyspark.ml.feature import Imputer, ImputerModel
> df = sparksession.createDataFrame(
> [
> (2.0, 3.0),
> (2.0, 1.0),
> (2.0, None),
> (None, 0.0)
> ],
> ["x200", "x3"]
> ).repartition(1)
> i = Imputer(inputCols=["x200", "x3"], outputCols=["x200_i", "x3_i"]).fit(
> df
> )
> tdf = i.transform(df)
> fpath = "/tmp/bucketpath"
> i.write().overwrite().save(fpath)
> li = ImputerModel.load(fpath)
> t2df = li.transform(df)
> assert all(
> r1.asDict() == r2.asDict() for r1, r2 in zip(
> tdf.collect(), t2df.collect()
> )
> )
> # This line makes Spark crash.
> li.write().overwrite().save(fpath)
> {code}
> h2. Stacktrace
> {code:python}
> --> 480 li.write().overwrite().save(fpath)
>
> /usr/spark-2.4.3/python/pyspark/ml/util.py in save(self, path)
> 181 if not isinstance(path, basestring):
> 182 raise TypeError("path should be a basestring, got type %s" % type(path))
> --> 183 self._jwrite.save(path)
> 184
> 185 def overwrite(self):
>
> /usr/local/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
> 1284 answer = self.gateway_client.send_command(command)
> 1285 return_value = get_return_value(
> -> 1286 answer, self.gateway_client, self.target_id, self.name)
> 1287
> 1288 for temp_arg in temp_args:
>
> /usr/spark-2.4.3/python/pyspark/sql/utils.py in deco(*a, **kw)
> 61 def deco(*a, **kw):
> 62 try:
> ---> 63 return f(*a, **kw)
> 64 except py4j.protocol.Py4JJavaError as e:
> 65 s = e.java_exception.toString()
>
> /usr/local/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
> 326 raise Py4JJavaError(
> 327 "An error occurred while calling {0}{1}{2}.\n".
> --> 328 format(target_id, ".", name), value)
> 329 else:
> 330 raise Py4JError(
>
> Py4JJavaError: An error occurred while calling o572.save.
> at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
> at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
> at org.apache.spark.ml.feature.ImputerModel$ImputerModelWriter.saveImpl(Imputer.scala:252)
> at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:180)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in stage 63.0 (TID 192, localhost, executor driver): java.io.FileNotFoundException: File file:/tmp/bucketpath/data/part-00000-0e35e712-a3f9-451a-856b-15dc690569ad-c000.snappy.parquet does not exist
> It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
> at scala.Option.foreach(Option.scala:257)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
> at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
> ... 35 more
> Caused by: java.io.FileNotFoundException: File file:/tmp/bucketpath/data/part-00000-0e35e712-a3f9-451a-856b-15dc690569ad-c000.snappy.parquet does not exist
> It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org