You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Berg Lloyd-Haig (Jira)" <ji...@apache.org> on 2023/10/25 06:45:00 UTC

[jira] [Commented] (SPARK-43108) org.apache.spark.storage.StorageStatus NotSerializableException when try to access StorageStatus in a MapPartitionsFunction

    [ https://issues.apache.org/jira/browse/SPARK-43108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779355#comment-17779355 ] 

Berg Lloyd-Haig commented on SPARK-43108:
-----------------------------------------

We are hitting this when reading from Snowflake using Spark 3.4 then printing some data to stdout or writing to Parquet.

It only occurs on the invocation, then the error no longer appears.

> org.apache.spark.storage.StorageStatus NotSerializableException when try to access StorageStatus in a MapPartitionsFunction
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-43108
>                 URL: https://issues.apache.org/jira/browse/SPARK-43108
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.1
>            Reporter: surender godara
>            Priority: Minor
>
> When you try to access the *storage status (org.apache.spark.storage.StorageStatus)* inside a MapPartitionsFunction,then getStorageStatus method throw the NotSerializableException. This exception is thrown because the StorageStatus object is not serializable.
> Here is an example code snippet that demonstrates how to access the storage status inside a MapPartitionsFunction in Spark:
> {code:java}
> StorageStatus[] storageStatus = SparkEnv.get().blockManager().master().getStorageStatus();{code}
> *Error stacktrace --*
> {code:java}
> Caused by: java.io.NotSerializableException: org.apache.spark.storage.StorageStatus
> Serialization stack:
>     - object not serializable (class: org.apache.spark.storage.StorageStatus, value: org.apache.spark.storage.StorageStatus@715b4e82)
>     - element of array (index: 0)
>     - array (class [Lorg.apache.spark.storage.StorageStatus;, size 2)
>     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
>     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:286)
>     at org.apache.spark.rpc.netty.RemoteNettyRpcCallContext.send(NettyRpcCallContext.scala:64)
>     at org.apache.spark.rpc.netty.NettyRpcCallContext.reply(NettyRpcCallContext.scala:32)
>     at org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:156)
>     at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
>     at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
>     at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>     at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
>     at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
>     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264){code}
> *Steps to reproduce*
> step 1  Initialize spark session with spark standalone mode.
> step 2  Create a Dataset using the SparkSession and load data
> step 3  Define the MapPartitionsFunction on Dataset and get storage status inside it.
> Here is the code snippet of MapPartitionsFunction
>  
> {code:java}
> df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
>             @Override
>             public Iterator<Row> call(Iterator<Row> input) throws Exception {
>                 StorageStatus[] storageStatus = SparkEnv.get().blockManager().master().getStorageStatus();
>                 return input;
>             }
>         }, RowEncoder.apply(df.schema()));
> {code}
>  
> Step4 - submit the spark job. 
>  
> *Solution -*
> Implement the Serializable interface for org.apache.spark.storage.StorageStatus.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org