You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:35:31 UTC

[jira] [Resolved] (SPARK-13773) UDF being applied to filtered data

     [ https://issues.apache.org/jira/browse/SPARK-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-13773.
----------------------------------
    Resolution: Incomplete

> UDF being applied to filtered data 
> -----------------------------------
>
>                 Key: SPARK-13773
>                 URL: https://issues.apache.org/jira/browse/SPARK-13773
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: James Hammerton
>            Priority: Major
>              Labels: bulk-closed
>
> Give the following code:
> {code:title=ReproduceSparkBug.scala|borderStyle=solid}
> import scala.reflect.runtime.universe
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions.udf
> import org.apache.spark.sql.types.DataTypes
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.apache.spark.SparkConf
> object ReproduceSparkBug {
>   def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setMaster("local")
>       .setAppName("ReproduceSparkBug")
>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>     val schema = StructType(Array(
>       StructField("userId", DataTypes.StringType),
>       StructField("objectId", DataTypes.StringType),
>       StructField("eventName", DataTypes.StringType),
>       StructField("eventJson", DataTypes.StringType),
>       StructField("timestamp", DataTypes.LongType)))
>     val sc = new SparkContext(conf)
>     val sqlContext = new SQLContext(sc)
>     val df = sqlContext.read.format("com.databricks.spark.csv")
>       .option("delimiter", "\t")
>       .option("header", "false")
>       .schema(schema).load("src/test/resources/foo.txt")
>     val filtered = df.filter((df("eventName")).endsWith("Created"))
>     val extracted = filtered.select(filtered(EventFieldNames.ObjectId), 
>       extractorUDF(filtered("eventJson"), filtered("objectId"), filtered("userId")) as "info")
>     extracted.filter((extracted("info")).notEqual("NO REFS")).collect.map(r => (r.getString(0), r.getString(1))).foreach(println(_))
>     sc.stop()
>   }
>   def extractorUDF = udf(extractor(_: String, _: String, _: String))
>   def extractor(eventJson: String, objectId: String, userId: String): String = {
>     println(eventJson + ":" + objectId + ":" + userId)
>     eventJson + ":" + objectId + ":" + userId
>   }
> }
> {code}
> If "src/test/resources" contains the following:
> {noformat}
> 113	0c38c6c327224e43a46f663b6424612f	Created	{"field":"value"}	1000
> 113	0c38c6c327224e43a46f663b6424612f	LabelRemoved	{"this":"should not appear"}	1000
> {noformat}
> Then the code outputs the following to std out:
> {noformat}
> {"field":"value"}:0c38c6c327224e43a46f663b6424612f:113
> {"field":"value"}:0c38c6c327224e43a46f663b6424612f:113
> {"this":"should not appear"}:0c38c6c327224e43a46f663b6424612f:113
> (0c38c6c327224e43a46f663b6424612f,{"field":"value"}:0c38c6c327224e43a46f663b6424612f:113)
> {noformat}
> If the first filter is cached (i.e. we write 'val filtered = df.filter((df("eventName")).endsWith("Created")).cache'), then only the first and last lines appear.
> What I think is happening is that the UDF is applied to the unfiltered data but then the filtering is applied so the correct data is output. Also it seems the UDF gets applied more than once to the data that isn't filtered for some reason.
> This caused problems in my original code where some json parsing was done in the UDF but was throwing exceptions because it was applied to data that should have been filtered out. The original code was reading from parquet but I switch to tab separated format here to make things easier to see/post.
> I suspect the bug hasn't been found hitherto since the correct results do get produced in the end, and the UDF would need to cause task failures when applied to the filtered data for people to notice.
> Note that I could not reproduce this unless the data was read in from a file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org