You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alexis Seigneurin <as...@ippon.fr> on 2016/09/12 20:47:40 UTC

Strings not converted when calling Scala code from a PySpark app

Hi,


*TL;DR - I have what looks like a DStream of Strings in a PySpark
application. I want to send it as a DStream[String] to a Scala library.
Strings are not converted by Py4j, though.*


I'm working on a PySpark application that pulls data from Kafka using Spark
Streaming. My messages are strings and I would like to call a method in
Scala code, passing it a DStream[String] instance. However, I'm unable to
receive proper JVM strings in the Scala code. It looks to me like the
Python strings are not converted into Java strings but, instead, are
serialized.

My question would be: how to get Java strings out of the DStream object?


Here is the simplest Python code I came up with:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"],
{"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()


I'm running this code in PySpark, passing it the path to my JAR:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar


On the Scala side, I have:

package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}


Now, let's say I send some data into Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list
localhost:9092 --topic IN


The println statement in the Scala code prints something that looks like:

[B@758aa4d9


I expected to get foo bar instead.

Now, if I replace the simple println statement in the Scala code with the
following:

rdd.foreach(v => println(v.getClass.getCanonicalName))


I get:

java.lang.ClassCastException: [B cannot be cast to java.lang.String


This suggests that the strings are actually passed as arrays of bytes.

If I simply try to convert this array of bytes into a string (I know I'm
not even specifying the encoding):

      def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
        val dstream = jdstream.dstream
        dstream.foreachRDD(rdd => {
          rdd.foreach(bytes => println(new String(bytes)))
        })
      }


I get something that looks like (special characters might be stripped off):

�]qXfoo barqa.


This suggests the Python string was serialized (pickled?). How could I
retrieve a proper Java string instead?


Thanks,
Alexis

Re: Strings not converted when calling Scala code from a PySpark app

Posted by Alexis Seigneurin <as...@ipponusa.com>.
Makes sense. Thanks Holden.

Alexis

On Mon, Sep 12, 2016 at 5:28 PM, Holden Karau <ho...@pigscanfly.ca> wrote:

> Ah yes so the Py4J conversions only apply on the driver program - your
> DStream however is RDDs of pickled objects. If you want to with a transform
> function use Spark SQL transferring DataFrames back and forth between
> Python and Scala spark can be much easier.
>
>
> On Monday, September 12, 2016, Alexis Seigneurin <as...@ippon.fr>
> wrote:
>
>> Hi,
>>
>>
>> *TL;DR - I have what looks like a DStream of Strings in a PySpark
>> application. I want to send it as a DStream[String] to a Scala library.
>> Strings are not converted by Py4j, though.*
>>
>>
>> I'm working on a PySpark application that pulls data from Kafka using
>> Spark Streaming. My messages are strings and I would like to call a method
>> in Scala code, passing it a DStream[String] instance. However, I'm unable
>> to receive proper JVM strings in the Scala code. It looks to me like the
>> Python strings are not converted into Java strings but, instead, are
>> serialized.
>>
>> My question would be: how to get Java strings out of the DStream object?
>>
>>
>> Here is the simplest Python code I came up with:
>>
>> from pyspark.streaming import StreamingContext
>> ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
>>
>> from pyspark.streaming.kafka import KafkaUtils
>> stream = KafkaUtils.createDirectStream(ssc, ["IN"],
>> {"metadata.broker.list": "localhost:9092"})
>> values = stream.map(lambda tuple: tuple[1])
>>
>> ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
>>
>> ssc.start()
>>
>>
>> I'm running this code in PySpark, passing it the path to my JAR:
>>
>> pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
>>
>>
>> On the Scala side, I have:
>>
>> package com.seigneurin
>>
>> import org.apache.spark.streaming.api.java.JavaDStream
>>
>> object MyPythonHelper {
>>   def doSomething(jdstream: JavaDStream[String]) = {
>>     val dstream = jdstream.dstream
>>     dstream.foreachRDD(rdd => {
>>       rdd.foreach(println)
>>     })
>>   }
>> }
>>
>>
>> Now, let's say I send some data into Kafka:
>>
>> echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list
>> localhost:9092 --topic IN
>>
>>
>> The println statement in the Scala code prints something that looks like:
>>
>> [B@758aa4d9
>>
>>
>> I expected to get foo bar instead.
>>
>> Now, if I replace the simple println statement in the Scala code with the
>> following:
>>
>> rdd.foreach(v => println(v.getClass.getCanonicalName))
>>
>>
>> I get:
>>
>> java.lang.ClassCastException: [B cannot be cast to java.lang.String
>>
>>
>> This suggests that the strings are actually passed as arrays of bytes.
>>
>> If I simply try to convert this array of bytes into a string (I know I'm
>> not even specifying the encoding):
>>
>>       def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
>>         val dstream = jdstream.dstream
>>         dstream.foreachRDD(rdd => {
>>           rdd.foreach(bytes => println(new String(bytes)))
>>         })
>>       }
>>
>>
>> I get something that looks like (special characters might be stripped
>> off):
>>
>> �]qXfoo barqa.
>>
>>
>> This suggests the Python string was serialized (pickled?). How could I
>> retrieve a proper Java string instead?
>>
>>
>> Thanks,
>> Alexis
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


-- 

*Alexis Seigneurin*
*Managing Consultant*
(202) 459-1591 <202%20459.1591> - LinkedIn
<http://www.linkedin.com/in/alexisseigneurin>

<http://ipponusa.com/>
Rate our service <https://www.recommendi.com/app/survey/Eh2ZnWUPTxY>

Re: Strings not converted when calling Scala code from a PySpark app

Posted by Holden Karau <ho...@pigscanfly.ca>.
Ah yes so the Py4J conversions only apply on the driver program - your
DStream however is RDDs of pickled objects. If you want to with a transform
function use Spark SQL transferring DataFrames back and forth between
Python and Scala spark can be much easier.

On Monday, September 12, 2016, Alexis Seigneurin <as...@ippon.fr>
wrote:

> Hi,
>
>
> *TL;DR - I have what looks like a DStream of Strings in a PySpark
> application. I want to send it as a DStream[String] to a Scala library.
> Strings are not converted by Py4j, though.*
>
>
> I'm working on a PySpark application that pulls data from Kafka using
> Spark Streaming. My messages are strings and I would like to call a method
> in Scala code, passing it a DStream[String] instance. However, I'm unable
> to receive proper JVM strings in the Scala code. It looks to me like the
> Python strings are not converted into Java strings but, instead, are
> serialized.
>
> My question would be: how to get Java strings out of the DStream object?
>
>
> Here is the simplest Python code I came up with:
>
> from pyspark.streaming import StreamingContext
> ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
>
> from pyspark.streaming.kafka import KafkaUtils
> stream = KafkaUtils.createDirectStream(ssc, ["IN"],
> {"metadata.broker.list": "localhost:9092"})
> values = stream.map(lambda tuple: tuple[1])
>
> ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
>
> ssc.start()
>
>
> I'm running this code in PySpark, passing it the path to my JAR:
>
> pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
>
>
> On the Scala side, I have:
>
> package com.seigneurin
>
> import org.apache.spark.streaming.api.java.JavaDStream
>
> object MyPythonHelper {
>   def doSomething(jdstream: JavaDStream[String]) = {
>     val dstream = jdstream.dstream
>     dstream.foreachRDD(rdd => {
>       rdd.foreach(println)
>     })
>   }
> }
>
>
> Now, let's say I send some data into Kafka:
>
> echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list
> localhost:9092 --topic IN
>
>
> The println statement in the Scala code prints something that looks like:
>
> [B@758aa4d9
>
>
> I expected to get foo bar instead.
>
> Now, if I replace the simple println statement in the Scala code with the
> following:
>
> rdd.foreach(v => println(v.getClass.getCanonicalName))
>
>
> I get:
>
> java.lang.ClassCastException: [B cannot be cast to java.lang.String
>
>
> This suggests that the strings are actually passed as arrays of bytes.
>
> If I simply try to convert this array of bytes into a string (I know I'm
> not even specifying the encoding):
>
>       def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
>         val dstream = jdstream.dstream
>         dstream.foreachRDD(rdd => {
>           rdd.foreach(bytes => println(new String(bytes)))
>         })
>       }
>
>
> I get something that looks like (special characters might be stripped off):
>
> �]qXfoo barqa.
>
>
> This suggests the Python string was serialized (pickled?). How could I
> retrieve a proper Java string instead?
>
>
> Thanks,
> Alexis
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau