You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ey-chih chow <ey...@hotmail.com> on 2014/12/25 17:32:08 UTC

serialization issue with mapPartitions

Hi,

I got some issues with mapPartitions with the following piece of code:

    val sessions = sc
      .newAPIHadoopFile(
        "... path to an avro file ...",
        classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[ByteBuffer]],
        classOf[AvroKey[ByteBuffer]],
        classOf[NullWritable],
        job.getConfiguration())
      .mapPartitions { valueIterator =>
        val config = job.getConfiguration()
                         .
                         .
                         .
      }
      .collect()

Why job.getConfiguration() in the function mapPartitions will generate the
following message?

Cause: java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job

If I take out 'val config = job.getConfiguration()' in the mapPartitions,
the code works fine, even through 
job.getConfiguration() shows up also in newAPIHadoopFile().

Ey-Chih Chow



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: serialization issue with mapPartitions

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

On Fri, Dec 26, 2014 at 1:32 AM, ey-chih chow <ey...@hotmail.com> wrote:
>
> I got some issues with mapPartitions with the following piece of code:
>
>     val sessions = sc
>       .newAPIHadoopFile(
>         "... path to an avro file ...",
>         classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[ByteBuffer]],
>         classOf[AvroKey[ByteBuffer]],
>         classOf[NullWritable],
>         job.getConfiguration())
>       .mapPartitions { valueIterator =>
>         val config = job.getConfiguration()
>                          .
>       }
>       .collect()
>
> Why job.getConfiguration() in the function mapPartitions will generate the
> following message?
>
> Cause: java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
>

The functions inside mapPartitions() will be executed on the Spark
executors, not the Spark driver. Therefore, the function body needs to be
serialized and sent to the executors via network. If that is not possible
(in your case, `job` cannot be serialized), you will get a
NotSerializableException. It works inside newAPIHadoopFile because this is
executed on the driver.

Tobias

RE: serialization issue with mapPartitions

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi,

Hadoop Configuration is only Writable, not Java Serializable. You can use SerializableWritable (in Spark) to wrap the Configuration to make it serializable, and use broadcast variable to broadcast this conf to all the node, then you can use it in mapPartitions, rather than  serialize it within closure.

You can refer to org.apache.spark.rdd.HadoopRDD, there is a similar usage scenario like yours.

Thanks
Jerry.

From: Tobias Pfeiffer [mailto:tgp@preferred.jp]
Sent: Friday, December 26, 2014 9:38 AM
To: ey-chih chow
Cc: user
Subject: Re: serialization issue with mapPartitions

Hi,

On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow <ey...@hotmail.com>> wrote:
I should rephrase my question as follows:

How to use the corresponding Hadoop Configuration of a HadoopRDD in defining
a function as an input parameter to the MapPartitions function?

Well, you could try to pull the `val config = job.getConfiguration()` out of the function and just use `config` inside the function, hoping that this one is serializable.

Tobias



Re: serialization issue with mapPartitions

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow <ey...@hotmail.com> wrote:

> I should rephrase my question as follows:
>
> How to use the corresponding Hadoop Configuration of a HadoopRDD in
> defining
> a function as an input parameter to the MapPartitions function?
>

Well, you could try to pull the `val config = job.getConfiguration()` out
of the function and just use `config` inside the function, hoping that this
one is serializable.

Tobias

Re: serialization issue with mapPartitions

Posted by Akhil <ak...@sigmoidanalytics.com>.
You cannot pass your jobConf object inside any of the transformation function
in spark (like map, mapPartitions, etc.) since 
 org.apache.hadoop.mapreduce.Job is not Serializable. You can use
KryoSerializer (See this doc
http://spark.apache.org/docs/latest/tuning.html#data-serialization), We
usually converts the JobConf into ByteArray and pass over the byteArray
object inside the map and from there we creates the jobConf (new variable)
with the data inside byteArray object.


> I should rephrase my question as follows:
> 
> How to use the corresponding Hadoop Configuration of a HadoopRDD in
> defining a function as an input parameter to the MapPartitions function?
> 
> Thanks.
> 
> Ey-Chih Chow





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858p20865.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: serialization issue with mapPartitions

Posted by ey-chih chow <ey...@hotmail.com>.
I should rephrase my question as follows:

How to use the corresponding Hadoop Configuration of a HadoopRDD in defining
a function as an input parameter to the MapPartitions function?

Thanks.

Ey-Chih Chow



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858p20861.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org