You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by pratik gawande <pr...@hotmail.com> on 2016/05/06 04:47:18 UTC

Fw: Significant performance difference for same spark job in scala vs pyspark

Hello,

I am new to spark. For one of  job I am finding significant performance difference when run in pyspark vs scala. Could you please let me know if this is known and scala is preferred over python for writing spark jobs? Also DAG visualization shows completely different DAGs for scala and pyspark. I have pasted DAG for both using toDebugString() method. Let me know if you need any additional information.

Time for Job in scala : 52 secs

Time for job in pyspark : 4.2 min


Scala code in Zepplin:

val lines = sc.textFile("s3://[test-bucket]/output2/")
val words = lines.flatMap(line => line.split(" "))
val filteredWords = words.filter(word => word.equals("Gutenberg") || word.equals("flower") || word.equals("a"))
val wordMap = filteredWords.map(word => (word, 1)).reduceByKey(_ + _)
wordMap.collect()

pyspark code in Zepplin:

lines = sc.textFile("s3://[test-bucket]/output2/")
words = lines.flatMap(lambda x: x.split())
filteredWords = words.filter(lambda x: (x == "Gutenberg" or x == "flower" or x == "a"))
result = filteredWords.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).collect()
print result


Scala final RDD:

print wordMap.toDebugString()

 lines: org.apache.spark.rdd.RDD[String] = s3://[test-bucket]/output2/ MapPartitionsRDD[108] at textFile at <console>:30 words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[109] at flatMap at <console>:31 filteredWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[110] at filter at <console>:33 wordMap: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[112] at reduceByKey at <console>:35 (10) ShuffledRDD[112] at reduceByKey at <console>:35 [] +-(10) MapPartitionsRDD[111] at map at <console>:35 [] | MapPartitionsRDD[110] at filter at <console>:33 [] | MapPartitionsRDD[109] at flatMap at <console>:31 [] | s3://[test-bucket]/output2/ MapPartitionsRDD[108] at textFile at <console>:30 [] | s3://[test-bucket]/output2/ HadoopRDD[107] at textFile at <console>:30 []


PySpark final RDD:

println(wordMap.toDebugString)

(10) PythonRDD[119] at RDD at PythonRDD.scala:43 [] | s3://[test-bucket]/output2/ MapPartitionsRDD[114] at textFile at null:-1 [] | s3://[test-bucket]/output2/HadoopRDD[113] at textFile at null:-1 [] PythonRDD[120] at RDD at PythonRDD.scala:43


Thanks,

Pratik

Re: Fw: Significant performance difference for same spark job in scala vs pyspark

Posted by nguyen duc tuan <ne...@gmail.com>.
Try to use Dataframe instead of RDD.
Here's an introduction to Dataframe:
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

2016-05-06 21:52 GMT+07:00 pratik gawande <pr...@hotmail.com>:

> Thanks Shao for quick reply. I will look into how pyspark jobs are
> executed. Any suggestions or reference to docs on how to tune pyspark jobs?
>
>
>
>
>
> On Thu, May 5, 2016 at 10:12 PM -0700, "Saisai Shao" <
> sai.sai.shao@gmail.com> wrote:
>
> Writing RDD based application using pyspark will bring in additional
> overheads, Spark is running on the JVM whereas your python code is running
> on python runtime, so data should be communicated between JVM world and
> python world, this requires additional serialization-deserialization, IPC.
> Also other parts will bring in overheads. So the performance difference is
> expected, but you could tune the application to reduce the gap.
>
> Also because python RDD wraps a lot, so the DAG you saw is different from
> Scala, that is also expected.
>
> Thanks
> Saisai
>
>
> On Fri, May 6, 2016 at 12:47 PM, pratik gawande <
> pratik.gawande@hotmail.com> wrote:
>
>> Hello,
>>
>> I am new to spark. For one of  job I am finding significant performance
>> difference when run in pyspark vs scala. Could you please let me know if
>> this is known and scala is preferred over python for writing spark jobs?
>> Also DAG visualization shows completely different DAGs for scala and
>> pyspark. I have pasted DAG for both using toDebugString() method. Let me
>> know if you need any additional information.
>>
>> *Time for Job in scala* : 52 secs
>>
>> *Time for job in pyspark *: 4.2 min
>>
>>
>> *Scala code in Zepplin:*
>>
>> val lines = sc.textFile("s3://[test-bucket]/output2/")
>> val words = lines.flatMap(line => line.split(" "))
>> val filteredWords = words.filter(word => word.equals("Gutenberg") ||
>> word.equals("flower") || word.equals("a"))
>> val wordMap = filteredWords.map(word => (word, 1)).reduceByKey(_ + _)
>> wordMap.collect()
>>
>> *pyspark code in Zepplin:*
>>
>> lines = sc.textFile("s3://[test-bucket]/output2/")
>> words = lines.flatMap(lambda x: x.split())
>> filteredWords = words.filter(lambda x: (x == "Gutenberg" or x ==
>> "flower" or x == "a"))
>> result = filteredWords.map(lambda x: (x, 1)).reduceByKey(lambda a,b:
>> a+b).collect()
>> print result
>>
>> *Scala final RDD:*
>>
>>
>> *print wordMap.toDebugString() *
>>
>>  lines: org.apache.spark.rdd.RDD[String] = s3://[test-bucket]/output2/
>> MapPartitionsRDD[108] at textFile at <console>:30 words:
>> org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[109] at flatMap at
>> <console>:31 filteredWords: org.apache.spark.rdd.RDD[String] =
>> MapPartitionsRDD[110] at filter at <console>:33 wordMap:
>> org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[112] at reduceByKey
>> at <console>:35 (10) ShuffledRDD[112] at reduceByKey at <console>:35 []
>> +-(10) MapPartitionsRDD[111] at map at <console>:35 [] |
>> MapPartitionsRDD[110] at filter at <console>:33 [] | MapPartitionsRDD[109]
>> at flatMap at <console>:31 [] | s3://[test-bucket]/output2/
>> MapPartitionsRDD[108] at textFile at <console>:30 [] |
>> s3://[test-bucket]/output2/ HadoopRDD[107] at textFile at <console>:30 []
>>
>>
>> *PySpark final RDD:*
>>
>>
>> *println(wordMap.toDebugString) *
>>
>> (10) PythonRDD[119] at RDD at PythonRDD.scala:43 [] | s3://[test-bucket]/
>> output2/ MapPartitionsRDD[114] at textFile at null:-1 [] |
>> s3://[test-bucket]/output2/HadoopRDD[113] at textFile at null:-1 []
>> PythonRDD[120] at RDD at PythonRDD.scala:43
>>
>>
>> Thanks,
>>
>> Pratik
>>
>
>

Re: Fw: Significant performance difference for same spark job in scala vs pyspark

Posted by pratik gawande <pr...@hotmail.com>.
Thanks Shao for quick reply. I will look into how pyspark jobs are executed. Any suggestions or reference to docs on how to tune pyspark jobs?





On Thu, May 5, 2016 at 10:12 PM -0700, "Saisai Shao" <sa...@gmail.com>> wrote:

Writing RDD based application using pyspark will bring in additional overheads, Spark is running on the JVM whereas your python code is running on python runtime, so data should be communicated between JVM world and python world, this requires additional serialization-deserialization, IPC. Also other parts will bring in overheads. So the performance difference is expected, but you could tune the application to reduce the gap.

Also because python RDD wraps a lot, so the DAG you saw is different from Scala, that is also expected.

Thanks
Saisai


On Fri, May 6, 2016 at 12:47 PM, pratik gawande <pr...@hotmail.com>> wrote:

Hello,

I am new to spark. For one of  job I am finding significant performance difference when run in pyspark vs scala. Could you please let me know if this is known and scala is preferred over python for writing spark jobs? Also DAG visualization shows completely different DAGs for scala and pyspark. I have pasted DAG for both using toDebugString() method. Let me know if you need any additional information.

Time for Job in scala : 52 secs

Time for job in pyspark : 4.2 min


Scala code in Zepplin:

val lines = sc.textFile("s3://[test-bucket]/output2/")
val words = lines.flatMap(line => line.split(" "))
val filteredWords = words.filter(word => word.equals("Gutenberg") || word.equals("flower") || word.equals("a"))
val wordMap = filteredWords.map(word => (word, 1)).reduceByKey(_ + _)
wordMap.collect()

pyspark code in Zepplin:

lines = sc.textFile("s3://[test-bucket]/output2/")
words = lines.flatMap(lambda x: x.split())
filteredWords = words.filter(lambda x: (x == "Gutenberg" or x == "flower" or x == "a"))
result = filteredWords.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).collect()
print result


Scala final RDD:

print wordMap.toDebugString()

 lines: org.apache.spark.rdd.RDD[String] = s3://[test-bucket]/output2/ MapPartitionsRDD[108] at textFile at <console>:30 words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[109] at flatMap at <console>:31 filteredWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[110] at filter at <console>:33 wordMap: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[112] at reduceByKey at <console>:35 (10) ShuffledRDD[112] at reduceByKey at <console>:35 [] +-(10) MapPartitionsRDD[111] at map at <console>:35 [] | MapPartitionsRDD[110] at filter at <console>:33 [] | MapPartitionsRDD[109] at flatMap at <console>:31 [] | s3://[test-bucket]/output2/ MapPartitionsRDD[108] at textFile at <console>:30 [] | s3://[test-bucket]/output2/ HadoopRDD[107] at textFile at <console>:30 []


PySpark final RDD:

println(wordMap.toDebugString)

(10) PythonRDD[119] at RDD at PythonRDD.scala:43 [] | s3://[test-bucket]/output2/ MapPartitionsRDD[114] at textFile at null:-1 [] | s3://[test-bucket]/output2/HadoopRDD[113] at textFile at null:-1 [] PythonRDD[120] at RDD at PythonRDD.scala:43


Thanks,

Pratik


Re: Fw: Significant performance difference for same spark job in scala vs pyspark

Posted by Saisai Shao <sa...@gmail.com>.
Writing RDD based application using pyspark will bring in additional
overheads, Spark is running on the JVM whereas your python code is running
on python runtime, so data should be communicated between JVM world and
python world, this requires additional serialization-deserialization, IPC.
Also other parts will bring in overheads. So the performance difference is
expected, but you could tune the application to reduce the gap.

Also because python RDD wraps a lot, so the DAG you saw is different from
Scala, that is also expected.

Thanks
Saisai


On Fri, May 6, 2016 at 12:47 PM, pratik gawande <pr...@hotmail.com>
wrote:

> Hello,
>
> I am new to spark. For one of  job I am finding significant performance
> difference when run in pyspark vs scala. Could you please let me know if
> this is known and scala is preferred over python for writing spark jobs?
> Also DAG visualization shows completely different DAGs for scala and
> pyspark. I have pasted DAG for both using toDebugString() method. Let me
> know if you need any additional information.
>
> *Time for Job in scala* : 52 secs
>
> *Time for job in pyspark *: 4.2 min
>
>
> *Scala code in Zepplin:*
>
> val lines = sc.textFile("s3://[test-bucket]/output2/")
> val words = lines.flatMap(line => line.split(" "))
> val filteredWords = words.filter(word => word.equals("Gutenberg") ||
> word.equals("flower") || word.equals("a"))
> val wordMap = filteredWords.map(word => (word, 1)).reduceByKey(_ + _)
> wordMap.collect()
>
> *pyspark code in Zepplin:*
>
> lines = sc.textFile("s3://[test-bucket]/output2/")
> words = lines.flatMap(lambda x: x.split())
> filteredWords = words.filter(lambda x: (x == "Gutenberg" or x == "flower"
> or x == "a"))
> result = filteredWords.map(lambda x: (x, 1)).reduceByKey(lambda a,b:
> a+b).collect()
> print result
>
> *Scala final RDD:*
>
>
> *print wordMap.toDebugString() *
>
>  lines: org.apache.spark.rdd.RDD[String] = s3://[test-bucket]/output2/
> MapPartitionsRDD[108] at textFile at <console>:30 words:
> org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[109] at flatMap at
> <console>:31 filteredWords: org.apache.spark.rdd.RDD[String] =
> MapPartitionsRDD[110] at filter at <console>:33 wordMap:
> org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[112] at reduceByKey
> at <console>:35 (10) ShuffledRDD[112] at reduceByKey at <console>:35 []
> +-(10) MapPartitionsRDD[111] at map at <console>:35 [] |
> MapPartitionsRDD[110] at filter at <console>:33 [] | MapPartitionsRDD[109]
> at flatMap at <console>:31 [] | s3://[test-bucket]/output2/
> MapPartitionsRDD[108] at textFile at <console>:30 [] | s3://[test-bucket]/
> output2/ HadoopRDD[107] at textFile at <console>:30 []
>
>
> *PySpark final RDD:*
>
>
> *println(wordMap.toDebugString) *
>
> (10) PythonRDD[119] at RDD at PythonRDD.scala:43 [] | s3://[test-bucket]/
> output2/ MapPartitionsRDD[114] at textFile at null:-1 [] |
> s3://[test-bucket]/output2/HadoopRDD[113] at textFile at null:-1 []
> PythonRDD[120] at RDD at PythonRDD.scala:43
>
>
> Thanks,
>
> Pratik
>