You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jai Kumar Singh <fl...@flukebox.in> on 2014/07/11 14:04:59 UTC

Calling Scala/Java methods which operates on RDD

HI,
  I want to write some common utility function in Scala and want to call
the same from Java/Python Spark API ( may be add some wrapper code around
scala calls). Calling Scala functions from Java works fine. I was reading
pyspark rdd code and find out that pyspark is able to call JavaRDD function
like union/zip to get same for pyspark RDD and deserializing the output and
everything works fine. But somehow I am
not able to work out really simple example. I think I am missing some
serialization/deserialization.

Can someone confirm that is it even possible to do so? Or, would it be much
easier to pass RDD data files around instead of RDD directly (from pyspark
to java/scala)?

For example, below code just add 1 to each element of RDD containing
Integers.

package flukebox.test;

object TestClass{

def testFunc(data:RDD[Int])={

  data.map(x => x+1)

}

}

Calling from python,

from pyspark import RDD

from py4j.java_gateway import java_import

java_import(sc._gateway.jvm, "flukebox.test")


data = sc.parallelize([1,2,3,4,5,6,7,8,9])

sc._jvm.flukebox.test.TestClass.testFunc(data._jrdd.rdd())


*This fails because testFunc get any RDD of type Byte Array.*


Any help/pointer would be highly appreciated.


Thanks & Regards,

Jai K Singh

Re: Calling Scala/Java methods which operates on RDD

Posted by Kan Zhang <kz...@apache.org>.
Hi Jai,

Your suspicion is correct. In general, Python RDDs are pickled into byte
arrays and stored in Java land as RDDs of byte arrays. union/zip operates
on byte arrays directly without deserializing. Currently, Python byte
arrays only get unpickled into Java objects in special cases, like SQL
functions or saving to Sequence Files (upcoming).

Hope it helps.

Kan


On Fri, Jul 11, 2014 at 5:04 AM, Jai Kumar Singh <fl...@flukebox.in>
wrote:

> HI,
>   I want to write some common utility function in Scala and want to call
> the same from Java/Python Spark API ( may be add some wrapper code around
> scala calls). Calling Scala functions from Java works fine. I was reading
> pyspark rdd code and find out that pyspark is able to call JavaRDD function
> like union/zip to get same for pyspark RDD and deserializing the output and
> everything works fine. But somehow I am
> not able to work out really simple example. I think I am missing some
> serialization/deserialization.
>
> Can someone confirm that is it even possible to do so? Or, would it be much
> easier to pass RDD data files around instead of RDD directly (from pyspark
> to java/scala)?
>
> For example, below code just add 1 to each element of RDD containing
> Integers.
>
> package flukebox.test;
>
> object TestClass{
>
> def testFunc(data:RDD[Int])={
>
>   data.map(x => x+1)
>
> }
>
> }
>
> Calling from python,
>
> from pyspark import RDD
>
> from py4j.java_gateway import java_import
>
> java_import(sc._gateway.jvm, "flukebox.test")
>
>
> data = sc.parallelize([1,2,3,4,5,6,7,8,9])
>
> sc._jvm.flukebox.test.TestClass.testFunc(data._jrdd.rdd())
>
>
> *This fails because testFunc get any RDD of type Byte Array.*
>
>
> Any help/pointer would be highly appreciated.
>
>
> Thanks & Regards,
>
> Jai K Singh
>