You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "James Hammerton (JIRA)" <ji...@apache.org> on 2016/03/09 11:26:40 UTC

[jira] [Created] (SPARK-13773) UDF being applied to filtered data

James Hammerton created SPARK-13773:
---------------------------------------

             Summary: UDF being applied to filtered data 
                 Key: SPARK-13773
                 URL: https://issues.apache.org/jira/browse/SPARK-13773
             Project: Spark
          Issue Type: Bug
    Affects Versions: 1.6.0
            Reporter: James Hammerton


Give the following code:
{code:title=ReproduceSparkBug.scala|borderStyle=solid}
import scala.reflect.runtime.universe
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.SparkConf

object ReproduceSparkBug {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local")
      .setAppName("ReproduceSparkBug")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val schema = StructType(Array(
      StructField("userId", DataTypes.StringType),
      StructField("objectId", DataTypes.StringType),
      StructField("eventName", DataTypes.StringType),
      StructField("eventJson", DataTypes.StringType),
      StructField("timestamp", DataTypes.LongType)))

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read.format("com.databricks.spark.csv")
      .option("delimiter", "\t")
      .option("header", "false")
      .schema(schema).load("src/test/resources/foo.txt")

    val filtered = df.filter((df("eventName")).endsWith("Created"))

    val extracted = filtered.select(filtered(EventFieldNames.ObjectId), 
      extractorUDF(filtered("eventJson"), filtered("objectId"), filtered("userId")) as "info")

    extracted.filter((extracted("info")).notEqual("NO REFS")).collect.map(r => (r.getString(0), r.getString(1))).foreach(println(_))
    sc.stop()
  }

  def extractorUDF = udf(extractor(_: String, _: String, _: String))
  def extractor(eventJson: String, objectId: String, userId: String): String = {
    println(eventJson + ":" + objectId + ":" + userId)
    eventJson + ":" + objectId + ":" + userId
  }
}
{code}

If "src/test/resources" contains the following:
{noformat}
113	0c38c6c327224e43a46f663b6424612f	Created	{"field":"value"}	1000
113	0c38c6c327224e43a46f663b6424612f	LabelRemoved	{"this":"should not appear"}	1000
{noformat}

Then the code outputs the following to std out:

{noformat}
{"field":"value"}:0c38c6c327224e43a46f663b6424612f:113
{"field":"value"}:0c38c6c327224e43a46f663b6424612f:113
{"this":"should not appear"}:0c38c6c327224e43a46f663b6424612f:113
(0c38c6c327224e43a46f663b6424612f,{"field":"value"}:0c38c6c327224e43a46f663b6424612f:113)
{noformat}

If the first filter is cached (i.e. we write 'val filtered = df.filter((df("eventName")).endsWith("Created")).cache'), then only the first and last lines appear.

What I think is happening is that the UDF is applied to the unfiltered data but then the filtering is applied so the correct data is output. Also it seems the UDF gets applied more than once to the data that isn't filtered for some reason.

This caused problems in my original code where some json parsing was done in the UDF but was throwing exceptions because it was applied to data that should have been filtered out. The original code was reading from parquet but I switch to tab separated format here to make things easier to see/post.

I suspect the bug hasn't been found hitherto since the correct results do get produced in the end, and the UDF would need to cause task failures when applied to the filtered data for people to notice.

Note that I could not reproduce this unless the data was read in from a file. 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org