You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@zeppelin.apache.org by Mauro Schneider <ma...@gmail.com> on 2017/09/29 15:23:53 UTC

PySpark with livy

Hi

I'm trying execute PySpark code with Zeppelin and Livy but without success.
With Scala and Livy work well but when I execute the code below I getting a
Exception from Zeppelin.

<code>
%livy.pyspark
txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word:
(word, 1)).reduceByKey(lambda a, b: a + b)
counts.collect()
</code>

<excpetion>
Version:0.9 StartHTML:0000000168 EndHTML:0000024858
StartFragment:0000000204 EndFragment:0000024822 SourceURL:
http://dtbhad02p.bvs.corp:8080/#/notebook/2CSK9TFZM
An error occurred while calling o47.textFile.
: java.lang.UnsatisfiedLinkError:
org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
at
org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:156)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1334)
at
org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1022)
at
org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1019)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1019)
at
org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:840)
at
org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:838)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:838)
at
org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
</exception>

I had too tested the code below with Curl and Livy and work correctly

<code /user/mulisses/test.py>
import sys
from pyspark import SparkContext

if __name__ == "__main__":
        sc = SparkContext(appName="Hello Spark")
        txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
        counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda
word: (word, 1)).reduceByKey(lambda a, b: a + b)
        counts.saveAsTextFile("test_wc_py")
</code>

<cUrl>
curl  -i --negotiate -u : -X POST --data '{"file":
"/user/mulisses/test.py"}' -H "Content-Type: application/json"
dtbhad02p.bvs.corp:8998/batches
</cUrl>

Is anyone know how to solve it ? I had tested with Zeppelin 0.7.2 and
Zeppelin 0.7.3
Am I forgetting some configuration?

Best regards,

Mauro Schneider

Re: PySpark with livy

Posted by Jeff Zhang <zj...@gmail.com>.
I see your test with livy via curl command, but seems you are submitting it
as batch. Could you do it via interactive livy session, this is what livy
interpreter of zeppelin does.


Mauro Schneider <ma...@gmail.com>于2017年10月1日周日 上午4:55写道:

> Hi Jeff
>
> Yes, the code work with PySpark Shell and the Spark Submit in the same
> server where is running the Zeppelin and Livy. And I did an another test, I
> executed the same code with cUrl using to Livy and work ok.
>
>
>
>
>
> Mauro Schneider
>
>
> On Fri, Sep 29, 2017 at 8:26 PM, Jeff Zhang <zj...@gmail.com> wrote:
>
>> It is more likely your spark configuration issue, could you run this code
>> in pyspark shell ?
>>
>>
>>
>> Mauro Schneider <ma...@gmail.com>于2017年9月29日周五 下午11:24写道:
>>
>>>
>>> Hi
>>>
>>> I'm trying execute PySpark code with Zeppelin and Livy but without
>>> success. With Scala and Livy work well but when I execute the code below I
>>> getting a Exception from Zeppelin.
>>>
>>> <code>
>>> %livy.pyspark
>>> txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
>>> counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word:
>>> (word, 1)).reduceByKey(lambda a, b: a + b)
>>> counts.collect()
>>> </code>
>>>
>>> <excpetion>
>>> Version:0.9 StartHTML:0000000168 EndHTML:0000024858
>>> StartFragment:0000000204 EndFragment:0000024822 SourceURL:
>>> http://dtbhad02p.bvs.corp:8080/#/notebook/2CSK9TFZM
>>> An error occurred while calling o47.textFile.
>>> : java.lang.UnsatisfiedLinkError:
>>> org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
>>> at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
>>> at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
>>> at
>>> org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
>>> at org.apache.spark.io
>>> .SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:156)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
>>> at scala.Option.map(Option.scala:145)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>> at
>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1334)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1022)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1019)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
>>> at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1019)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:840)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:838)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
>>> at org.apache.spark.SparkContext.textFile(SparkContext.scala:838)
>>> at
>>> org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:188)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>>> at py4j.Gateway.invoke(Gateway.java:259)
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:209)
>>> at java.lang.Thread.run(Thread.java:745)
>>> </exception>
>>>
>>> I had too tested the code below with Curl and Livy and work correctly
>>>
>>> <code /user/mulisses/test.py>
>>> import sys
>>> from pyspark import SparkContext
>>>
>>> if __name__ == "__main__":
>>>         sc = SparkContext(appName="Hello Spark")
>>>         txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
>>>         counts = txtFile.flatMap(lambda line: line.split("
>>> ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
>>>         counts.saveAsTextFile("test_wc_py")
>>> </code>
>>>
>>> <cUrl>
>>> curl  -i --negotiate -u : -X POST --data '{"file":
>>> "/user/mulisses/test.py"}' -H "Content-Type: application/json"
>>> dtbhad02p.bvs.corp:8998/batches
>>> </cUrl>
>>>
>>> Is anyone know how to solve it ? I had tested with Zeppelin 0.7.2 and
>>> Zeppelin 0.7.3
>>> Am I forgetting some configuration?
>>>
>>> Best regards,
>>>
>>> Mauro Schneider
>>>
>>>
>>>
>

Re: PySpark with livy

Posted by Mauro Schneider <ma...@gmail.com>.
Hi Jeff

Yes, the code work with PySpark Shell and the Spark Submit in the same
server where is running the Zeppelin and Livy. And I did an another test, I
executed the same code with cUrl using to Livy and work ok.





Mauro Schneider


On Fri, Sep 29, 2017 at 8:26 PM, Jeff Zhang <zj...@gmail.com> wrote:

> It is more likely your spark configuration issue, could you run this code
> in pyspark shell ?
>
>
>
> Mauro Schneider <ma...@gmail.com>于2017年9月29日周五 下午11:24写道:
>
>>
>> Hi
>>
>> I'm trying execute PySpark code with Zeppelin and Livy but without
>> success. With Scala and Livy work well but when I execute the code below I
>> getting a Exception from Zeppelin.
>>
>> <code>
>> %livy.pyspark
>> txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
>> counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word:
>> (word, 1)).reduceByKey(lambda a, b: a + b)
>> counts.collect()
>> </code>
>>
>> <excpetion>
>> Version:0.9 StartHTML:0000000168 EndHTML:0000024858
>> StartFragment:0000000204 EndFragment:0000024822 SourceURL:
>> http://dtbhad02p.bvs.corp:8080/#/notebook/2CSK9TFZM
>> An error occurred while calling o47.textFile.
>> : java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.
>> maxCompressedLength(I)I
>> at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
>> at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
>> at org.xerial.snappy.SnappyOutputStream.<init>(
>> SnappyOutputStream.java:79)
>> at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(
>> CompressionCodec.scala:156)
>> at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.
>> apply(TorrentBroadcast.scala:200)
>> at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.
>> apply(TorrentBroadcast.scala:200)
>> at scala.Option.map(Option.scala:145)
>> at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(
>> TorrentBroadcast.scala:200)
>> at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(
>> TorrentBroadcast.scala:102)
>> at org.apache.spark.broadcast.TorrentBroadcast.<init>(
>> TorrentBroadcast.scala:85)
>> at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(
>> TorrentBroadcastFactory.scala:34)
>> at org.apache.spark.broadcast.BroadcastManager.newBroadcast(
>> BroadcastManager.scala:63)
>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1334)
>> at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(
>> SparkContext.scala:1022)
>> at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(
>> SparkContext.scala:1019)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:150)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:111)
>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
>> at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1019)
>> at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(
>> SparkContext.scala:840)
>> at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(
>> SparkContext.scala:838)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:150)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:111)
>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
>> at org.apache.spark.SparkContext.textFile(SparkContext.scala:838)
>> at org.apache.spark.api.java.JavaSparkContext.textFile(
>> JavaSparkContext.scala:188)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>> NativeMethodAccessorImpl.java:57)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>> at py4j.Gateway.invoke(Gateway.java:259)
>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:209)
>> at java.lang.Thread.run(Thread.java:745)
>> </exception>
>>
>> I had too tested the code below with Curl and Livy and work correctly
>>
>> <code /user/mulisses/test.py>
>> import sys
>> from pyspark import SparkContext
>>
>> if __name__ == "__main__":
>>         sc = SparkContext(appName="Hello Spark")
>>         txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
>>         counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda
>> word: (word, 1)).reduceByKey(lambda a, b: a + b)
>>         counts.saveAsTextFile("test_wc_py")
>> </code>
>>
>> <cUrl>
>> curl  -i --negotiate -u : -X POST --data '{"file":
>> "/user/mulisses/test.py"}' -H "Content-Type: application/json"
>> dtbhad02p.bvs.corp:8998/batches
>> </cUrl>
>>
>> Is anyone know how to solve it ? I had tested with Zeppelin 0.7.2 and
>> Zeppelin 0.7.3
>> Am I forgetting some configuration?
>>
>> Best regards,
>>
>> Mauro Schneider
>>
>>
>>

Re: PySpark with livy

Posted by Jeff Zhang <zj...@gmail.com>.
It is more likely your spark configuration issue, could you run this code
in pyspark shell ?



Mauro Schneider <ma...@gmail.com>于2017年9月29日周五 下午11:24写道:

>
> Hi
>
> I'm trying execute PySpark code with Zeppelin and Livy but without
> success. With Scala and Livy work well but when I execute the code below I
> getting a Exception from Zeppelin.
>
> <code>
> %livy.pyspark
> txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
> counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word:
> (word, 1)).reduceByKey(lambda a, b: a + b)
> counts.collect()
> </code>
>
> <excpetion>
> Version:0.9 StartHTML:0000000168 EndHTML:0000024858
> StartFragment:0000000204 EndFragment:0000024822 SourceURL:
> http://dtbhad02p.bvs.corp:8080/#/notebook/2CSK9TFZM
> An error occurred while calling o47.textFile.
> : java.lang.UnsatisfiedLinkError:
> org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
> at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
> at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
> at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
> at
> org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:156)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
> at scala.Option.map(Option.scala:145)
> at
> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
> at
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
> at
> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
> at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> at
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1334)
> at
> org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1022)
> at
> org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1019)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
> at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1019)
> at
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:840)
> at
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:838)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
> at org.apache.spark.SparkContext.textFile(SparkContext.scala:838)
> at
> org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:188)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
> at py4j.Gateway.invoke(Gateway.java:259)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:209)
> at java.lang.Thread.run(Thread.java:745)
> </exception>
>
> I had too tested the code below with Curl and Livy and work correctly
>
> <code /user/mulisses/test.py>
> import sys
> from pyspark import SparkContext
>
> if __name__ == "__main__":
>         sc = SparkContext(appName="Hello Spark")
>         txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
>         counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda
> word: (word, 1)).reduceByKey(lambda a, b: a + b)
>         counts.saveAsTextFile("test_wc_py")
> </code>
>
> <cUrl>
> curl  -i --negotiate -u : -X POST --data '{"file":
> "/user/mulisses/test.py"}' -H "Content-Type: application/json"
> dtbhad02p.bvs.corp:8998/batches
> </cUrl>
>
> Is anyone know how to solve it ? I had tested with Zeppelin 0.7.2 and
> Zeppelin 0.7.3
> Am I forgetting some configuration?
>
> Best regards,
>
> Mauro Schneider
>
>
>