You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rasmus Schøler Sørensen (Jira)" <ji...@apache.org> on 2023/10/24 12:11:00 UTC

[jira] [Commented] (SPARK-31836) input_file_name() gives wrong value following Python UDF usage

    [ https://issues.apache.org/jira/browse/SPARK-31836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779048#comment-17779048 ] 

Rasmus Schøler Sørensen commented on SPARK-31836:
-------------------------------------------------

We have also encountered this bug. Rather unfortunate that this bug has persisted for at least 3.5 years without resolution.

We would like to do what we can to help resolve this issue.

In the mean time, I guess we will mitigate this issue by first loading the raw file data into a "raw" table (using `input_file_name()` to populate column with source input file name column), then process the raw table and apply the UDF in a second step, outputting to a second table.

For the record, I've included our observations regarding the extend of this bug below:
h2. Findings:

The issue occurs whenever a Python UDF is used, both when using `spark.read` and when using `spark.readStream`.
We did not observe any cases where the read method would affect whether the bug manifested or not (i.e. `spark.read` vs `spark.readStream.text` vs 'cloudFiles' stream).
In all cases, the bug only manifested in when `input_file_name()` was used in conjunction with a UDF.

The issue was observed in the following versions, regardless of whether the UDF was placed before or after `input_file_name()`:
 - Spark 3.5.0 (Databricks Runtime 14.1).
 - Spark 3.4.1 (Databricks Runtime 13.3).
 - Spark 3.3.2 (Databricks Runtime 12.2).
 - Spark 3.3.0 (Databricks Runtime 11.3).

For the following versions, we only observed the issue when the UDF column was placed *before* `input_file_name()`:
 - Spark 3.2.1 (Databricks Runtime 10.4).
 - Spark 3.1.2 (Databricks Runtime  9.1).

 
h2. Methodology:

We tested four ways of loading data:
 # Using `spark.read`, without a Python UDF.
 # Using `spark.read`, with a Python UDF.
 # Using `spark.readStream`, without a Python UDF.
 # Using `spark.readStream`, with a Python UDF.

The following read methods and formats were tested:
 - Raw text-file read: `spark.read.format('text').load(...)`
 - Text-file stream: `spark.readStream.text(...)`.
 - 'cloudFiles' text stream: `spark.readStream.format('cloudFiles').option("cloudFiles.format", "text").load(...)`

Input data consisted of a single folder with 2206 text files, each text file containing an average of 732 lines, with each line representing a single value (in this case, a file path), in total 1615051 rows/lines across all files.

All reads were output to a delta table. The delta-table was subsequently analyzed for number of distinct values of the `input_file_name` column.
In cases where the bug manifested, the number of distinct files was typically around 70-140 (with the expected/correct number being 2206).

Everything was run inside a "Databricks" environment. Note that Databricks sometimes adds some "special sauce" to their version of Spark, although generally the "Databricks Spark" is very close to standard Apache Spark.

The cluster used for all tests was a 4-core single-node cluster.

 

> input_file_name() gives wrong value following Python UDF usage
> --------------------------------------------------------------
>
>                 Key: SPARK-31836
>                 URL: https://issues.apache.org/jira/browse/SPARK-31836
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.4.5, 3.0.0
>            Reporter: Wesley Hildebrandt
>            Priority: Major
>
> I'm using PySpark for Spark 3.0.0 RC1 with Python 3.6.8.
> The following commands demonstrate that the input_file_name() function sometimes returns the wrong filename following usage of a Python UDF:
> {code}
> $ for i in `seq 5`; do echo $i > /tmp/test-file-$i; done
> $ pyspark
> >>> import pyspark.sql.functions as F
> >>> spark.readStream.text('file:///tmp/test-file-*', wholetext=True).withColumn('file1', F.input_file_name()).withColumn('udf', F.udf(lambda x:x)('value')).withColumn('file2', F.input_file_name()).writeStream.trigger(once=True).foreachBatch(lambda df,_: df.select('file1','file2').show(truncate=False, vertical=True)).start().awaitTermination()
> {code}
> A few notes about this bug:
>  * It happens with many different files, so it's not related to the file contents
>  * It also happens loading files from HDFS, so storage location is not a factor
>  * It also happens using .csv() to read the files instead of .text(), so input format is not a factor
>  * I have not been able to cause the error without using readStream, so it seems to be related to streaming
>  * The bug also happens using spark-submit to send a job to my cluster
>  * I haven't tested an older version, but it's possible that Spark pulls 24958 and 25321([https://github.com/apache/spark/pull/24958], [https://github.com/apache/spark/pull/25321]) to fix issue 28153 (https://issues.apache.org/jira/browse/SPARK-28153) introduced this bug?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org