You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zhenyi Lin (JIRA)" <ji...@apache.org> on 2019/04/04 03:51:00 UTC

[jira] [Created] (SPARK-27375) cache not working after call discretizer.fit(df).transform

Zhenyi Lin created SPARK-27375:
----------------------------------

             Summary: cache not working after call discretizer.fit(df).transform
                 Key: SPARK-27375
                 URL: https://issues.apache.org/jira/browse/SPARK-27375
             Project: Spark
          Issue Type: Bug
          Components: Examples
    Affects Versions: 2.3.0
            Reporter: Zhenyi Lin


Below gives an example. col(r1) should be equal to col(r2) if cache operation works. However, after using discretizer fit and transformation DF, col(r1) and col(r2) becomes different

 

 

spark.catalog.clearCache()
import random
random.seed(123)

@udf(IntegerType())
def ri():(
 return random.choice([1,2,3,4,5,6,7,8,9])

df = spark.range(100).repartition("id")

#remove discretizer part, col(r1) will be equal to col(r2)
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", outputCol="quantileNo")
df = discretizer.fit(df).transform(df)

df = df.withColumn("r", ri()).cache()
df1 = df.withColumnRenamed("r", "r1")
df2 = df.withColumnRenamed("r", "r2")

df1.join(df2, "id").explain()
dfj = df1.join(df2, "id")
dfj.select("id", "r1", "r2").show(5)

 

The result is shown as below, we see that col(r1) and col(r2) are different. The physical plan of join operation shows that the cache() is missed. On the other hand, if we add one row df = df.rdd.toDF() before creating df1 and df2, or if we remove discretizer fit and transformation, col(r1) and col(r2) become the same. 

 

== Physical Plan ==
*(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
+- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
 :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS quantileNo#15622, pythonUDF0#15661 AS r1#15645]
 : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
 : +- Exchange hashpartitioning(id#15612L, 24)
 : +- *(1) Range (0, 100, step=1, splits=6)
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
 +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS quantileNo#15653, pythonUDF0#15662 AS r2#15649]
 +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
 +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
+---+---+---+
| id| r1| r2|
+---+---+---+
| 28| 9| 3|
| 30| 3| 6|
| 88| 1| 9|
| 67| 3| 3|
| 66| 1| 5|
+---+---+---+
only showing top 5 rows



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org