You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Leona Yoda (Jira)" <ji...@apache.org> on 2022/02/01 03:11:00 UTC

[jira] [Commented] (SPARK-37958) Pyspark SparkContext.AddFile() does not respect spark.files.overwrite

    [ https://issues.apache.org/jira/browse/SPARK-37958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17485032#comment-17485032 ] 

Leona Yoda commented on SPARK-37958:
------------------------------------

This is probably default behavior not only on k8s or pyspark environment.

{{spark.files.overwrite}} was introduced in 1.0. Then at the PR [https://github.com/apache/spark/pull/14396] , calling addFile twice on the same name file is not allowed on drivers side.

By setting this true users can overwrite the files that already existed at startup by calling addFile or addJar, which is prohibited at default configuration.

 
{code:java}
// code placeholder
$ export K8S_ENDPOINT="https://192.168.49.2:8443" # minikube
$ export SPARK_IMAGE="spark:add-file-in-advance" # containes /opt/spark/work-dir/test.text
$ docker run -i $SPARK_IMAGE cat /opt/spark/work-dir/test.txt
...
This is original file

# put a new file on driver's side
$ cat work-dir/test.txt
Hello
spark
on
k8


# spark.files.overwrite=false
$ ./bin/spark-shell \
  --master k8s://${K8S_ENDPOINT} \
  --deploy-mode client \
  --name spark-shell \
  --conf spark.executor.instances=1 \
  --conf spark.kubernetes.container.image=${SPARK_IMAGE} \
  --conf spark.kubernetes.container.image.pullPolicy=Never \
  --conf spark.files.overwrite=false
scala> spark.read.format("text").load("work-dir/test.txt").show()
[Stage 0:>                                                                                                         +--------------------+
|               value|
+--------------------+
|This is original ...|
+--------------------+
scala> sc.addFile("work-dir/test.txt")
scala> spark.read.format("text").load("work-dir/test.txt").show()
22/02/01 02:48:02 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (172.17.0.4 executor 1): org.apache.spark.SparkException: File ./test.txt exists and does not match contents of spark://...


# spark.files.overwrite=true
$ ./bin/spark-shell \
  --master k8s://${K8S_ENDPOINT} \
  --deploy-mode client \
  --name spark-shell \
  --conf spark.executor.instances=1 \
  --conf spark.kubernetes.container.image=${SPARK_IMAGE} \
  --conf spark.kubernetes.container.image.pullPolicy=Never \
  --conf spark.files.overwrite=true

scala> sc.addFile("work-dir/test.txt")
scala> spark.read.format("text").load("work-dir/test.txt").show() 
[Stage 0:>                                                                                                         +-----+
|value|
+-----+
|Hello|
|spark|
|   on|
|  k8s|
+-----+
scala> sc.addFile("work-dir/test.txt")
22/02/01 02:54:08 WARN SparkContext: The path work-dir/test.txt has been added already. Overwriting of added paths is not supported in the current version{code}
 

We might be better to consider deleting this option because the PR aimed to avoid concurrency problem, but the behavior above might cause the problem.

 

Anyway, it is misleading description so I would like to open update PR.

> Pyspark SparkContext.AddFile() does not respect spark.files.overwrite
> ---------------------------------------------------------------------
>
>                 Key: SPARK-37958
>                 URL: https://issues.apache.org/jira/browse/SPARK-37958
>             Project: Spark
>          Issue Type: Bug
>          Components: Documentation, Input/Output, Java API
>    Affects Versions: 3.1.1
>            Reporter: taylor schneider
>            Priority: Major
>
> I am currently running apache spark 3.1.1. on kubernetes.
> When I try to re-add a file that has already been added I see that the updated file is not actually loaded into the cluster. I see the following warning when calling the addFile() function.
> {code:java}
> 22/01/18 19:05:50 WARN SparkContext: The path http://15.4.12.12:80/demo_data.csv has been added already. Overwriting of added paths is not supported in the current version. {code}
> When I display the dataframe that was loaded I see that the old data is loaded. If I log into the worker pods and delete the file, the same results or observed.
> My SparkConf has the following configurations
> {code:java}
> ('spark.master', 'k8s://https://15.4.7.11:6443')
> ('spark.app.name', 'spark-jupyter-mlib')
> ('spark.submit.deploy.mode', 'cluster')
> ('spark.kubernetes.container.image', 'tschneider/apache-spark-k8:v7')
> ('spark.kubernetes.namespace', 'spark')
> ('spark.kubernetes.pyspark.pythonVersion', '3')
> ('spark.kubernetes.authenticate.driver.serviceAccountName', 'spark-sa')
> ('spark.kubernetes.authenticate.serviceAccountName', 'spark-sa')
> ('spark.executor.instances', '3')
> ('spark.executor.cores', '2')
> ('spark.executor.memory', '4096m')
> ('spark.executor.memoryOverhead', '1024m')
> ('spark.driver.memory', '1024m')
> ('spark.driver.host', '15.4.12.12')
> ('spark.files.overwrite', 'true')
> ('spark.files.useFetchCache', 'false') {code}
> According to the documentation for 3.1.1. The spark.files.overwrite parameter should in fact load the updated files. The documentation can be found here: [https://spark.apache.org/docs/3.1.1/configuration.html]
> The only workaround is to use a python function to manually delete and re-download the file. Calling addFile still shows the warning in this case. My code for the delete and redownload is as follows:
> {code:java}
> def os_remove(file_path):
>     import socket
>     hostname = socket.gethostname()    action = None
>     import os
>     if os.path.exists(file_path):
>         action = "delete"
>         os.remove(file_path)
>         
>     return (hostname, action)worker_file_path = u"file:///{0}".format(csv_file_name)
> worker_count = int(spark_session.conf.get('spark.executor.instances'))
> rdd = sc.parallelize(range(worker_count)).map(lambda var: os_remove(worker_file_path))
> rdd.collect()
> def download_updated_file(file_url):
>     import urllib.parse as parse
>     file_name = os.path.basename(parse.urlparse(csv_file_url).path)
>     local_file_path = "/{0}".format(file_name)
>     
>     import urllib.request as urllib
>     urllib.urlretrieve(file_url, local_file_path)
>     
> rdd = sc.parallelize(range(worker_count)).map(lambda var: download_updated_file(csv_file_url))
> rdd.collect(){code}
> I believe this is either a bug or a documentation mistake. Perhaps the configuration parameter has a misleading description?
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org