You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by David Diebold <da...@gmail.com> on 2022/01/03 08:39:22 UTC

Re: Pyspark debugging best practices

Hello Andy,

Are you sure you want to perform lots of join operations, and not simple
unions ?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each
individual dataset size ?
Have a look at execution plan would help, maybe the high amount of join
operations makes execution plan too complicated at the end of the day ;
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson <ae...@ucsc.edu.invalid> a
écrit :

> Hi Gourav
>
> I will give databricks a try.
>
> Each data gets loaded into a data frame.
> I select one column from the data frame
> I join the column to the  accumulated joins from previous data frames in
> the list
>
> To debug. I think am gaining to put an action and log statement after each
> join. I do not think it will change the performance. I believe the physical
> plan will be the same how ever hopefully it will shed some light.
>
> At the very least I will know if it making progress or not. And hopefully
> where it is breaking
>
> Happy new year
>
> Andy
>
> On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta <go...@gmail.com>
> wrote:
>
>> Hi Andrew,
>>
>> Any chance you might give Databricks a try in GCP?
>>
>> The above transformations look complicated to me, why are you adding
>> dataframes to a list?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson <ae...@ucsc.edu.invalid>
>> wrote:
>>
>>> Hi
>>>
>>>
>>>
>>> I am having trouble debugging my driver. It runs correctly on smaller
>>> data set but fails on large ones.  It is very hard to figure out what the
>>> bug is. I suspect it may have something do with the way spark is installed
>>> and configured. I am using google cloud platform dataproc pyspark
>>>
>>>
>>>
>>> The log messages are not helpful. The error message will be something
>>> like
>>> "User application exited with status 1"
>>>
>>>
>>>
>>> And
>>>
>>>
>>>
>>> jsonPayload: {
>>>
>>> class: "server.TThreadPoolServer"
>>>
>>> filename: "hive-server2.log"
>>>
>>> message: "Error occurred during processing of message."
>>>
>>> thread: "HiveServer2-Handler-Pool: Thread-40"
>>>
>>> }
>>>
>>>
>>>
>>> I am able to access the spark history server however it does not capture
>>> anything if the driver crashes. I am unable to figure out how to access
>>> spark web UI.
>>>
>>>
>>>
>>> My driver program looks something like the pseudo code bellow. A long
>>> list of transforms with a single action, (i.e. write) at the end. Adding
>>> log messages is not helpful because of lazy evaluations. I am tempted to
>>> add something like
>>>
>>>
>>>
>>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and
>>> inline some sort of diagnostic message.
>>>
>>>
>>>
>>> What do you think?
>>>
>>>
>>>
>>> Is there a better way to debug this?
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> def run():
>>>
>>>     listOfDF = []
>>>
>>>     for filePath in listOfFiles:
>>>
>>>         df = spark.read.load( filePath, ...)
>>>
>>>         listOfDF.append(df)
>>>
>>>
>>>
>>>
>>>
>>>     list2OfDF = []
>>>
>>>     for df in listOfDF:
>>>
>>>         df2 = df.select( .... )
>>>
>>>         lsit2OfDF.append( df2 )
>>>
>>>
>>>
>>>     # will setting list to None free cache?
>>>
>>>     # or just driver memory
>>>
>>>     listOfDF = None
>>>
>>>
>>>
>>>
>>>
>>>     df3 = list2OfDF[0]
>>>
>>>
>>>
>>>     for i in range( 1, len(list2OfDF) ):
>>>
>>>         df = list2OfDF[i]
>>>
>>>         df3 = df3.join(df ...)
>>>
>>>
>>>
>>>     # will setting to list to None free cache?
>>>
>>>     # or just driver memory
>>>
>>>     List2OfDF = None
>>>
>>>
>>>
>>>
>>>
>>>     lots of narrow transformations on d3
>>>
>>>
>>>
>>>     return df3
>>>
>>>
>>>
>>> def main() :
>>>
>>>     df = run()
>>>
>>>     df.write()
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>

Re: Joining many tables Re: Pyspark debugging best practices

Posted by Sonal Goyal <so...@gmail.com>.
Hi Andrew,

Do you think the following would work?

Build data frames by appending a column source to each (sampleName). Add
extra columns as per scheme of quantSchema. Then union. So you get one data
frame with many entries per name. You can then use windowing functions over
them.

On Tue, 4 Jan 2022 at 6:29 AM, Andrew Davidson <ae...@ucsc.edu.invalid>
wrote:

> Hi David
>
>
>
> I need to select 1 column from many files and combine them into a single
> table.
>
>
>
> I do not believe union() will work. It appends rows, not columns.
>
>
>
> As far as I know join() is the only way to append columns from different
> data frames.
>
>
>
> I think you correct that using lazy evaluation over a lot of joins may
> make the execution plan to complicated. To debug I added
>
>
>
> logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i,
> df.count(), retDF.count()  )
>
>
>
> to try and simplify the execution plan.
>
>
>
> Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started
> making some progress how ever fails after a few files. Resources are maxed
> out!
>
>
>
> I estimated that that the raw data should be < 500 GB. I am running a
> cluster with 2.8 TB that should be more than enough to spark over head
>
>
>
> Is spark integrated with the python garbage collector?
>
>
>
> I assume createOrReplaceTempView() would cause cache to get flushed as
> needed?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
>     ###############################################################################
>
>
>     def _loadSalmonReadsTable(self):
>
>         '''
>
>         AEDWIP TODO
>
>         '''
>
>         self.logger.info( "BEGIN" )
>
>         retNumReadsDF = None
>
>         quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength`
> DOUBLE, `TPM` DOUBLE, `NumReads` DOUBLE "
>
>         for i in range( len(self.fileList) ):
>
>             #
>
>             # get NumReads from next salmon *quant* file
>
>             #
>
>             quantFile = self.fileList[i]
>
>             sampleDF = self.spark.read.load( quantFile, format="*csv*",
> sep="\t",
>
>                                      schema=quantSchema, header="true" )
>
>                                     # did not fix bug .repartition(50)
>
>
>
>             sampleName = self.sampleNamesList[i]
>
>             sampleDF = sampleDF.select( ["Name", "NumReads"] )\
>
>                             .withColumnRenamed( "NumReads", sampleName )
>
>
>
>             sampleDF.createOrReplaceTempView( "sample" )
>
>
>
>             self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num
> rows:{} *num* *cols*:{} *num* parts:{}"
>
>                              .format(i, sampleName, sampleDF.count(),
> len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))
>
>
>
>             #
>
>             # append NumReads to table of reads
>
>             #
>
>
>
>             # the sample name must be quoted else column names with a '-'
>
>             # like 1117F-0426-SM-5EGHI will generate an error
>
>             # spark think the '-' is an expression. '_' is also
>
>             # a special char for the *sql* like operator
>
>             # https://stackoverflow.com/a/63899306/4586180
>
>             sqlStmt = '\t\t\t\t\t\*tselect* *rc*.*, `{}` \n\
>
>                             from \n\
>
>                                retNumReadsDF as *rc*, \n\
>
>                                sample  \n\
>
>                             where \n\
>
>                                 rc.Name == sample.Name \n'.format(
> sampleName )
>
>
>
>             self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
>
>             if i == 0 :
>
>                 retNumReadsDF = sampleDF
>
>             else :
>
>                 retNumReadsDF = self.spark.sql( sqlStmt )
>
>
>
>             retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )
>
>
>
>             #
>
>             # debug. seems like we do not make progress when we run on
> training
>
>             # nothing happens, logs do not change, cluster metrics drop
> suggesting no work
>
>             # is being done
>
>             # add an action to try and debug
>
>             # this should not change the physical plan. I.e. we still
> have the same number of shuffles
>
>             # which results in the same number of stage. We are just not
> building up a plan with thousands
>
>             # of stages.
>
>             #
>
>             self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} *num*
> *cols*:{} *num* parts:{}"
>
>                              .format(i, retNumReadsDF.count(),
> len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )
>
>
>
>             #
>
>             # TODO AEDWIP spark analyze chapter 18 debugging joins
>
>
>
>             # execution plan should be the same for each join
>
>             #rawCountsSDF.explain()
>
>
>
>         self.logger.info( "END\n" )
>
>         return retNumReadsDF
>
>
>
>
>
> *From: *David Diebold <da...@gmail.com>
> *Date: *Monday, January 3, 2022 at 12:39 AM
> *To: *Andrew Davidson <ae...@ucsc.edu.invalid>, "user @spark" <
> user@spark.apache.org>
> *Subject: *Re: Pyspark debugging best practices
>
>
>
> Hello Andy,
>
>
>
> Are you sure you want to perform lots of join operations, and not simple
> unions ?
>
> Are you doing inner joins or outer joins ?
>
> Can you provide us with a rough amount of your list size plus each
> individual dataset size ?
>
> Have a look at execution plan would help, maybe the high amount of join
> operations makes execution plan too complicated at the end of the day ;
> checkpointing could help there ?
>
>
>
> Cheers,
>
> David
>
>
>
>
>
> Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson <ae...@ucsc.edu.invalid>
> a écrit :
>
> Hi Gourav
>
>
>
> I will give databricks a try.
>
>
>
> Each data gets loaded into a data frame.
>
> I select one column from the data frame
>
> I join the column to the  accumulated joins from previous data frames in
> the list
>
>
>
> To debug. I think am gaining to put an action and log statement after each
> join. I do not think it will change the performance. I believe the physical
> plan will be the same how ever hopefully it will shed some light.
>
>
>
> At the very least I will know if it making progress or not. And hopefully
> where it is breaking
>
>
>
> Happy new year
>
>
>
> Andy
>
>
>
> On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta <go...@gmail.com>
> wrote:
>
> Hi Andrew,
>
>
>
> Any chance you might give Databricks a try in GCP?
>
>
>
> The above transformations look complicated to me, why are you adding
> dataframes to a list?
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
>
>
> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson <ae...@ucsc.edu.invalid>
> wrote:
>
> Hi
>
>
>
> I am having trouble debugging my driver. It runs correctly on smaller data
> set but fails on large ones.  It is very hard to figure out what the bug
> is. I suspect it may have something do with the way spark is installed and
> configured. I am using google cloud platform dataproc pyspark
>
>
>
> The log messages are not helpful. The error message will be something like
> "User application exited with status 1"
>
>
>
> And
>
>
>
> jsonPayload: {
>
> class: "server.TThreadPoolServer"
>
> filename: "hive-server2.log"
>
> message: "Error occurred during processing of message."
>
> thread: "HiveServer2-Handler-Pool: Thread-40"
>
> }
>
>
>
> I am able to access the spark history server however it does not capture
> anything if the driver crashes. I am unable to figure out how to access
> spark web UI.
>
>
>
> My driver program looks something like the pseudo code bellow. A long list
> of transforms with a single action, (i.e. write) at the end. Adding log
> messages is not helpful because of lazy evaluations. I am tempted to add
> something like
>
>
>
> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline
> some sort of diagnostic message.
>
>
>
> What do you think?
>
>
>
> Is there a better way to debug this?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> def run():
>
>     listOfDF = []
>
>     for filePath in listOfFiles:
>
>         df = spark.read.load( filePath, ...)
>
>         listOfDF.append(df)
>
>
>
>
>
>     list2OfDF = []
>
>     for df in listOfDF:
>
>         df2 = df.select( .... )
>
>         lsit2OfDF.append( df2 )
>
>
>
>     # will setting list to None free cache?
>
>     # or just driver memory
>
>     listOfDF = None
>
>
>
>
>
>     df3 = list2OfDF[0]
>
>
>
>     for i in range( 1, len(list2OfDF) ):
>
>         df = list2OfDF[i]
>
>         df3 = df3.join(df ...)
>
>
>
>     # will setting to list to None free cache?
>
>     # or just driver memory
>
>     List2OfDF = None
>
>
>
>
>
>     lots of narrow transformations on d3
>
>
>
>     return df3
>
>
>
> def main() :
>
>     df = run()
>
>     df.write()
>
>
>
>
>
>
>
> --
Cheers,
Sonal
https://github.com/zinggAI/zingg

Joining many tables Re: Pyspark debugging best practices

Posted by Andrew Davidson <ae...@ucsc.edu.INVALID>.
Hi David

I need to select 1 column from many files and combine them into a single table.

I do not believe union() will work. It appends rows, not columns.

As far as I know join() is the only way to append columns from different data frames.

I think you correct that using lazy evaluation over a lot of joins may make the execution plan to complicated. To debug I added

logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, df.count(), retDF.count()  )

to try and simplify the execution plan.


Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started making some progress how ever fails after a few files. Resources are maxed out!



I estimated that that the raw data should be < 500 GB. I am running a cluster with 2.8 TB that should be more than enough to spark over head



Is spark integrated with the python garbage collector?



I assume createOrReplaceTempView() would cause cache to get flushed as needed?



Kind regards



Andy


    ###############################################################################
    def _loadSalmonReadsTable(self):
        '''
        AEDWIP TODO
        '''
        self.logger.info( "BEGIN" )
        retNumReadsDF = None
        quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE, `TPM` DOUBLE, `NumReads` DOUBLE "
        for i in range( len(self.fileList) ):
            #
            # get NumReads from next salmon quant file
            #
            quantFile = self.fileList[i]
            sampleDF = self.spark.read.load( quantFile, format="csv", sep="\t",
                                     schema=quantSchema, header="true" )
                                    # did not fix bug .repartition(50)

            sampleName = self.sampleNamesList[i]
            sampleDF = sampleDF.select( ["Name", "NumReads"] )\
                            .withColumnRenamed( "NumReads", sampleName )

            sampleDF.createOrReplaceTempView( "sample" )

            self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num rows:{} num cols:{} num parts:{}"
                             .format(i, sampleName, sampleDF.count(), len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))

            #
            # append NumReads to table of reads
            #

            # the sample name must be quoted else column names with a '-'
            # like 1117F-0426-SM-5EGHI will generate an error
            # spark think the '-' is an expression. '_' is also
            # a special char for the sql like operator
            # https://stackoverflow.com/a/63899306/4586180
            sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
                            from \n\
                               retNumReadsDF as rc, \n\
                               sample  \n\
                            where \n\
                                rc.Name == sample.Name \n'.format( sampleName )

            self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
            if i == 0 :
                retNumReadsDF = sampleDF
            else :
                retNumReadsDF = self.spark.sql( sqlStmt )

            retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )

            #
            # debug. seems like we do not make progress when we run on training
            # nothing happens, logs do not change, cluster metrics drop suggesting no work
            # is being done
            # add an action to try and debug
            # this should not change the physical plan. I.e. we still have the same number of shuffles
            # which results in the same number of stage. We are just not building up a plan with thousands
            # of stages.
            #
            self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} num cols:{} num parts:{}"
                             .format(i, retNumReadsDF.count(), len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )

            #
            # TODO AEDWIP spark analyze chapter 18 debugging joins

            # execution plan should be the same for each join
            #rawCountsSDF.explain()

        self.logger.info( "END\n" )
        return retNumReadsDF


From: David Diebold <da...@gmail.com>
Date: Monday, January 3, 2022 at 12:39 AM
To: Andrew Davidson <ae...@ucsc.edu.invalid>, "user @spark" <us...@spark.apache.org>
Subject: Re: Pyspark debugging best practices

Hello Andy,

Are you sure you want to perform lots of join operations, and not simple unions ?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each individual dataset size ?
Have a look at execution plan would help, maybe the high amount of join operations makes execution plan too complicated at the end of the day ; checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson <ae...@ucsc.edu.invalid> a écrit :
Hi Gourav

I will give databricks a try.

Each data gets loaded into a data frame.
I select one column from the data frame
I join the column to the  accumulated joins from previous data frames in the list

To debug. I think am gaining to put an action and log statement after each join. I do not think it will change the performance. I believe the physical plan will be the same how ever hopefully it will shed some light.

At the very least I will know if it making progress or not. And hopefully where it is breaking

Happy new year

Andy

On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta <go...@gmail.com>> wrote:
Hi Andrew,

Any chance you might give Databricks a try in GCP?

The above transformations look complicated to me, why are you adding dataframes to a list?


Regards,
Gourav Sengupta



On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson <ae...@ucsc.edu.invalid> wrote:
Hi

I am having trouble debugging my driver. It runs correctly on smaller data set but fails on large ones.  It is very hard to figure out what the bug is. I suspect it may have something do with the way spark is installed and configured. I am using google cloud platform dataproc pyspark

The log messages are not helpful. The error message will be something like
"User application exited with status 1"

And

jsonPayload: {
class: "server.TThreadPoolServer"
filename: "hive-server2.log"
message: "Error occurred during processing of message."
thread: "HiveServer2-Handler-Pool: Thread-40"
}

I am able to access the spark history server however it does not capture anything if the driver crashes. I am unable to figure out how to access spark web UI.

My driver program looks something like the pseudo code bellow. A long list of transforms with a single action, (i.e. write) at the end. Adding log messages is not helpful because of lazy evaluations. I am tempted to add something like

Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline some sort of diagnostic message.

What do you think?

Is there a better way to debug this?

Kind regards

Andy

def run():
    listOfDF = []
    for filePath in listOfFiles:
        df = spark.read.load( filePath, ...)
        listOfDF.append(df)


    list2OfDF = []
    for df in listOfDF:
        df2 = df.select( .... )
        lsit2OfDF.append( df2 )

    # will setting list to None free cache?
    # or just driver memory
    listOfDF = None


    df3 = list2OfDF[0]

    for i in range( 1, len(list2OfDF) ):
        df = list2OfDF[i]
        df3 = df3.join(df ...)

    # will setting to list to None free cache?
    # or just driver memory
    List2OfDF = None


    lots of narrow transformations on d3

    return df3

def main() :
    df = run()
    df.write()