You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "surender godara (Jira)" <ji...@apache.org> on 2023/04/12 14:59:00 UTC

[jira] [Created] (SPARK-43108) org.apache.spark.storage.StorageStatus NotSerializableException when try to access StorageStatus in a MapPartitionsFunction

surender godara created SPARK-43108:
---------------------------------------

             Summary: org.apache.spark.storage.StorageStatus NotSerializableException when try to access StorageStatus in a MapPartitionsFunction
                 Key: SPARK-43108
                 URL: https://issues.apache.org/jira/browse/SPARK-43108
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.2.1
            Reporter: surender godara


When you try to access the *storage status (org.apache.spark.storage.StorageStatus)* inside a MapPartitionsFunction,then getStorageStatus method throw the NotSerializableException. This exception is thrown because the StorageStatus object is not serializable.

Here is an example code snippet that demonstrates how to access the storage status inside a MapPartitionsFunction in Spark:
{code:java}
StorageStatus[] storageStatus = SparkEnv.get().blockManager().master().getStorageStatus();{code}

*Error stacktrace --*
{code:java}
Caused by: java.io.NotSerializableException: org.apache.spark.storage.StorageStatus
Serialization stack:
    - object not serializable (class: org.apache.spark.storage.StorageStatus, value: org.apache.spark.storage.StorageStatus@715b4e82)
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.storage.StorageStatus;, size 2)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:286)
    at org.apache.spark.rpc.netty.RemoteNettyRpcCallContext.send(NettyRpcCallContext.scala:64)
    at org.apache.spark.rpc.netty.NettyRpcCallContext.reply(NettyRpcCallContext.scala:32)
    at org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:156)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264){code}

*Steps to reproduce*


step 1  create spark session with spark standalone mode.

step 2  Create a Dataset using the SparkSession object and load data from a file or from a database.

step 3  Define the MapPartitionsFunction and get storage status inside.

Here is the code snippet of MapPartitionsFunction


 
{code:java}
df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
            @Override
            public Iterator<Row> call(Iterator<Row> input) throws Exception {
                StorageStatus[] storageStatus = SparkEnv.get().blockManager().master().getStorageStatus();
                return input;
            }
        }, RowEncoder.apply(df.schema()));
{code}
 


Step4 - submit the spark job. 

 

*Solution -*

Implement the Serializable interface for org.apache.spark.storage.StorageStatus.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org