You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pavel Klemenkov <pk...@gmail.com> on 2017/05/10 14:11:17 UTC

[Spark Core]: Python and Scala generate different DAGs for identical code

This Scala code:
scala> val logs = sc.textFile("big_data_specialization/log.txt").
     | filter(x => !x.contains("INFO")).
     | map(x => (x.split("\t")(1), 1)).
     | reduceByKey((x, y) => x + y)

generated obvious lineage:

(2) ShuffledRDD[4] at reduceByKey at <console>:27 []
 +-(2) MapPartitionsRDD[3] at map at <console>:26 []
    |  MapPartitionsRDD[2] at filter at <console>:25 []
    |  big_data_specialization/log.txt MapPartitionsRDD[1] at textFile at
<console>:24 []
    |  big_data_specialization/log.txt HadoopRDD[0] at textFile at
<console>:24 []

But Python code:

logs = sc.textFile("../log.txt")\
         .filter(lambda x: 'INFO' not in x)\
         .map(lambda x: (x.split('\t')[1], 1))\
         .reduceByKey(lambda x, y: x + y)

generated something strange which is hard to follow:

(2) PythonRDD[13] at RDD at PythonRDD.scala:48 []

 |  MapPartitionsRDD[12] at mapPartitions at PythonRDD.scala:422 []
 |  ShuffledRDD[11] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[10] at reduceByKey at <ipython-input-9-d6a34e0335b0>:1 []
    |  PythonRDD[9] at reduceByKey at <ipython-input-9-d6a34e0335b0>:1 []
    |  ../log.txt MapPartitionsRDD[8] at textFile at
NativeMethodAccessorImpl.java:0 []
    |  ../log.txt HadoopRDD[7] at textFile at
NativeMethodAccessorImpl.java:0 []

Why is that? Does pyspark do some optimizations under the hood? This debug
string is really useless for debugging.
-- 
Yours faithfully, Pavel Klemenkov.