You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2021/03/24 14:50:00 UTC
[jira] [Commented] (SPARK-34510) .foreachPartition command hangs
when ran inside Python package but works when ran from Python file outside
the package on EMR
[ https://issues.apache.org/jira/browse/SPARK-34510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17307879#comment-17307879 ]
Sean R. Owen commented on SPARK-34510:
--------------------------------------
Firstly, does this happen on Apache Spark or just EMR?
Are you sure it's not just taking a long time to read data from S3?
> .foreachPartition command hangs when ran inside Python package but works when ran from Python file outside the package on EMR
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-34510
> URL: https://issues.apache.org/jira/browse/SPARK-34510
> Project: Spark
> Issue Type: Bug
> Components: EC2, PySpark
> Affects Versions: 3.0.0
> Reporter: Yuriy
> Priority: Minor
> Attachments: Code.zip
>
>
> I'm running on EMR Pyspark 3.0.0. with project structure below, process.py is what controls the flow of the application and calls code inside the _file_processor_ package. The command hangs when the .foreachPartition code that is located inside _s3_repo.py_ is called by _process.py_. When the same .foreachPartition code is moved from _s3_repo.py_ and placed inside the _process.py_ it runs just fine.
> {code:java}
> process.py
> file_processor
> config
> spark.py
> repository
> s3_repo.py
> structure
> table_creator.py
> {code}
> *process.py*
> {code:java}
> from file_processor.structure import table_creator
> from file_processor.repository import s3_repo
> def process():
> table_creator.create_table()
> s3_repo.save_to_s3()
> if __name__ == '__main__':
> process()
> {code}
> *spark.py*
> {code:java}
> from pyspark.sql import SparkSession
> spark_session = SparkSession.builder.appName("Test").getOrCreate()
> {code}
> *s3_repo.py*
> {code:java}
> from file_processor.config.spark import spark_session
> def save_to_s3():
> spark_session.sql('SELECT * FROM rawFileData').toJSON().foreachPartition(_save_to_s3)
> def _save_to_s3(iterator):
> for record in iterator:
> print(record)
> {code}
> *table_creator.py*
> {code:java}
> from file_processor.config.spark import spark_session
> from pyspark.sql import Row
> def create_table():
> file_contents = [
> {'line_num': 1, 'contents': 'line 1'},
> {'line_num': 2, 'contents': 'line 2'},
> {'line_num': 3, 'contents': 'line 3'}
> ]
> spark_session.createDataFrame(Row(**row) for row in file_contents).cache().createOrReplaceTempView("rawFileData")
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org