You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andrew Davidson <ae...@ucsc.edu.INVALID> on 2022/01/07 01:13:15 UTC

How to add a row number column with out reordering my data frame

Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file with just the name and column of interest is about 470 M. The column of interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
            w = Window().orderBy(lit('A'))
            sampleDF = sampleDF.select( ["NumReads"] )\
                        .withColumnRenamed( "NumReads", sampleName )\
                        .withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering my data? The order is important

Kind regards

Andy

Re: How to add a row number column with out reordering my data frame

Posted by Andrew Davidson <ae...@ucsc.edu.INVALID>.
Thanks!

I will take a look

Andy

From: Gourav Sengupta <go...@gmail.com>
Date: Tuesday, January 11, 2022 at 8:42 AM
To: Andrew Davidson <ae...@ucsc.edu>
Cc: Andrew Davidson <ae...@ucsc.edu.invalid>, "user @spark" <us...@spark.apache.org>
Subject: Re: How to add a row number column with out reordering my data frame

Hi,
I do not think we need to do any of that. Please try repartitionbyrange, dpark 3 has adaptive query execution with configurations to handle skew as well.

Regards,
Gourav

On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson <ae...@ucsc.edu>> wrote:
HI Gourav

When I join I get OOM. To address this my thought was to split my tables into small batches of rows. And then join the batch together then use union. My assumption is the union is a narrow transform and as such require fewer resources. Let say I have 5 data frames I want to join together and each has 300 rows

I want to create 15 data frames.

Set1 = {11, 12, 13, 14, 15}

Set2 = {21, 22, 23, 24, 25}

Set3 = {31, 32, 33, 34, 35)

The joined the “batch
S1joinDF = 11.join(12).join(13).join(14).join(15)

S2joinDF = 21.join(22).join(23).join(24).join(25)

S3joinDF = 31.join(32).join(33).join(34).join(35)

resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )

The I originally wrote my code is as follows. Based on my unit test it turns out I need to call orderBy on every iteration of the for loop. I tried sorting outside of the while loop, did not resolve problem Given the size of my dataframes that is going crush performance. My unit test works. I never ran it on my real data set.

    # Create a copy of original dataframe
        copyDF = df.orderBy("Name")
        # copyDF.show()

        i = 0
        while i < numberOfSplits:
            self.logger.warn("i:{}".format(i))
            # Get the top `numRows` number of rows
            # note take() is an action
            # limit() is a transformation
            topDF = copyDF.limit( numRows )

            # Truncate the `copy_df` to remove
            # the contents fetched for `temp_df`
            # original quant.sf files are sorted by name however
            # we must use order by, else the row names between
            # GTEx sample will not be the same
            # we can not simply sort or orderBy once. we have to
            # do this on every iteration
            copyDF = copyDF.subtract(topDF).orderBy( "Name" )

            retList[i] = topDF

            # Increment the split number
            i += 1

        if remainingRows > 0 :
            self.logger.info<http://self.logger.info>("AEDWIP writing last i:{} len(retList):{}".format(i, len(retList)))
            retList[i] = copyDF
            #copyDF.show()
            #retList[i].show()


okay so that the background. Rather than use order by. I thought if I could add a row number I could easily split up mydata frames. My code would look a lot like what I would write in pandas or R

while i < numBatches:
    start = i * numRows
    end = start + numRows
    print("\ni:{} start:{} end:{}".format(i, start,end))
    df = trainDF.iloc[ start:end ]

There does not seem to be an easy way to do this.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.


Comments and suggestions appreciated

Andy


From: Gourav Sengupta <go...@gmail.com>>
Date: Monday, January 10, 2022 at 11:03 AM
To: Andrew Davidson <ae...@ucsc.edu.invalid>
Cc: "user @spark" <us...@spark.apache.org>>
Subject: Re: How to add a row number column with out reordering my data frame

Hi,

I am a bit confused here, it is not entirely clear to me why are you creating the row numbers, and how creating the row numbers helps you with the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson <ae...@ucsc.edu.invalid> wrote:
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file with just the name and column of interest is about 470 M. The column of interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
            w = Window().orderBy(lit('A'))
            sampleDF = sampleDF.select( ["NumReads"] )\
                        .withColumnRenamed( "NumReads", sampleName )\
                        .withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering my data? The order is important

Kind regards

Andy

Re: How to add a row number column with out reordering my data frame

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,
I do not think we need to do any of that. Please try repartitionbyrange,
dpark 3 has adaptive query execution with configurations to handle skew as
well.

Regards,
Gourav

On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson <ae...@ucsc.edu> wrote:

> HI Gourav
>
>
>
> When I join I get OOM. To address this my thought was to split my tables
> into small batches of rows. And then join the batch together then use
> union. My assumption is the union is a narrow transform and as such require
> fewer resources. Let say I have 5 data frames I want to join together and
> each has 300 rows
>
>
>
> I want to create 15 data frames.
>
>
>
> Set1 = {11, 12, 13, 14, 15}
>
>
>
> Set2 = {21, 22, 23, 24, 25}
>
>
>
> Set3 = {31, 32, 33, 34, 35)
>
>
>
> The joined the “batch
>
> S1joinDF = 11.join(12).join(13).join(14).join(15)
>
>
>
> S2joinDF = 21.join(22).join(23).join(24).join(25)
>
>
>
> S3joinDF = 31.join(32).join(33).join(34).join(35)
>
>
>
> resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )
>
>
>
> The I originally wrote my code is as follows. Based on my unit test it
> turns out I need to call orderBy on every iteration of the for loop. I
> tried sorting outside of the while loop, did not resolve problem Given the
> size of my dataframes that is going crush performance. My unit test works.
> I never ran it on my real data set.
>
>
>
>     # Create a copy of original *dataframe*
>
>         copyDF = df.orderBy("Name")
>
>         # copyDF.show()
>
>
>
>         i = 0
>
>         while i < numberOfSplits:
>
>             self.logger.warn("i:{}".format(i))
>
>             # Get the top `numRows` number of rows
>
>             # note take() is an action
>
>             # limit() is a transformation
>
>             topDF = copyDF.limit( numRows )
>
>
>
>             # Truncate the `copy_df` to remove
>
>             # the contents fetched for `temp_df`
>
>             # original quant.sf files are sorted by name however
>
>             # we must use order by, else the row names between
>
>             # GTEx sample will not be the same
>
>             # we can not simply sort or orderBy once. we have to
>
>             # do this on every iteration
>
>             copyDF = copyDF.subtract(topDF).orderBy( "Name" )
>
>
>
>             retList[i] = topDF
>
>
>
>             # Increment the split number
>
>             i += 1
>
>
>
>         if remainingRows > 0 :
>
>             self.logger.info("AEDWIP writing last i:{} *len*(retList):{}".format(i,
> len(retList)))
>
>             retList[i] = copyDF
>
>             #copyDF.show()
>
>             #retList[i].show()
>
>
>
>
>
> okay so that the background. Rather than use order by. I thought if I
> could add a row number I could easily split up mydata frames. My code would
> look a lot like what I would write in pandas or R
>
>
>
> *while* i *<* numBatches:
>
>     start *=* i *** numRows
>
>     end *=* start *+* numRows
>
>     print("\ni:{} start:{} end:{}"*.*format(i, start,end))
>
>     df *=* trainDF*.*iloc[ start:end ]
>
>
>
> There does not seem to be an easy way to do this.
>
>
> https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
>
> The generated ID is guaranteed to be monotonically increasing and unique,
> but not consecutive.
>
>
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Monday, January 10, 2022 at 11:03 AM
> *To: *Andrew Davidson <ae...@ucsc.edu.invalid>
> *Cc: *"user @spark" <us...@spark.apache.org>
> *Subject: *Re: How to add a row number column with out reordering my data
> frame
>
>
>
> Hi,
>
>
>
> I am a bit confused here, it is not entirely clear to me why are you
> creating the row numbers, and how creating the row numbers helps you with
> the joins?
>
>
>
> Can you please explain with some sample data?
>
>
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson <ae...@ucsc.edu.invalid>
> wrote:
>
> Hi
>
>
>
> I am trying to work through a OOM error. I have 10411 files. I want to
> select a single column from each file and then join them into a single
> table.
>
>
>
> The files have a row unique id. However it is a very long string. The data
> file with just the name and column of interest is about 470 M. The column
> of interest alone is 21 m. it is a column over 5 million real numbers.
>
>
>
> So I thought I would save a lot of memory if I can join over row numbers.
>
>
>
> # create *dummy* variable to *orderby*
> https://www.py4u.net/discuss/1840945
>
>             w = Window().orderBy(lit('A'))
>
>             sampleDF = sampleDF.select( ["NumReads"] )\
>
>                         .withColumnRenamed( "NumReads", sampleName )\
>
>                         .withColumn( "*tid*",row_number().over(w) )
>
>
>
>
>
> This code seem pretty complicated as someone coming from pandas an R
> dataframes. My unit test works however it generates the following warning.
>
>
>
>
>
> WARN WindowExec: No Partition Defined for Window operation! Moving all
> data to a single partition, this can cause serious performance degradation.
>
>
>
>
>
> Is there a better  way to create a row number with our reordering my data?
> The order is important
>
>
>
> Kind regards
>
>
>
> Andy
>
>

Re: How to add a row number column with out reordering my data frame

Posted by Andrew Davidson <ae...@ucsc.edu.INVALID>.
HI Gourav

When I join I get OOM. To address this my thought was to split my tables into small batches of rows. And then join the batch together then use union. My assumption is the union is a narrow transform and as such require fewer resources. Let say I have 5 data frames I want to join together and each has 300 rows

I want to create 15 data frames.

Set1 = {11, 12, 13, 14, 15}

Set2 = {21, 22, 23, 24, 25}

Set3 = {31, 32, 33, 34, 35)

The joined the “batch
S1joinDF = 11.join(12).join(13).join(14).join(15)

S2joinDF = 21.join(22).join(23).join(24).join(25)

S3joinDF = 31.join(32).join(33).join(34).join(35)

resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )

The I originally wrote my code is as follows. Based on my unit test it turns out I need to call orderBy on every iteration of the for loop. I tried sorting outside of the while loop, did not resolve problem Given the size of my dataframes that is going crush performance. My unit test works. I never ran it on my real data set.

    # Create a copy of original dataframe
        copyDF = df.orderBy("Name")
        # copyDF.show()

        i = 0
        while i < numberOfSplits:
            self.logger.warn("i:{}".format(i))
            # Get the top `numRows` number of rows
            # note take() is an action
            # limit() is a transformation
            topDF = copyDF.limit( numRows )

            # Truncate the `copy_df` to remove
            # the contents fetched for `temp_df`
            # original quant.sf files are sorted by name however
            # we must use order by, else the row names between
            # GTEx sample will not be the same
            # we can not simply sort or orderBy once. we have to
            # do this on every iteration
            copyDF = copyDF.subtract(topDF).orderBy( "Name" )

            retList[i] = topDF

            # Increment the split number
            i += 1

        if remainingRows > 0 :
            self.logger.info("AEDWIP writing last i:{} len(retList):{}".format(i, len(retList)))
            retList[i] = copyDF
            #copyDF.show()
            #retList[i].show()


okay so that the background. Rather than use order by. I thought if I could add a row number I could easily split up mydata frames. My code would look a lot like what I would write in pandas or R

while i < numBatches:
    start = i * numRows
    end = start + numRows
    print("\ni:{} start:{} end:{}".format(i, start,end))
    df = trainDF.iloc[ start:end ]

There does not seem to be an easy way to do this.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.


Comments and suggestions appreciated

Andy


From: Gourav Sengupta <go...@gmail.com>
Date: Monday, January 10, 2022 at 11:03 AM
To: Andrew Davidson <ae...@ucsc.edu.invalid>
Cc: "user @spark" <us...@spark.apache.org>
Subject: Re: How to add a row number column with out reordering my data frame

Hi,

I am a bit confused here, it is not entirely clear to me why are you creating the row numbers, and how creating the row numbers helps you with the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson <ae...@ucsc.edu.invalid> wrote:
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file with just the name and column of interest is about 470 M. The column of interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
            w = Window().orderBy(lit('A'))
            sampleDF = sampleDF.select( ["NumReads"] )\
                        .withColumnRenamed( "NumReads", sampleName )\
                        .withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering my data? The order is important

Kind regards

Andy

Re: How to add a row number column with out reordering my data frame

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

I am a bit confused here, it is not entirely clear to me why are you
creating the row numbers, and how creating the row numbers helps you with
the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson <ae...@ucsc.edu.invalid>
wrote:

> Hi
>
>
>
> I am trying to work through a OOM error. I have 10411 files. I want to
> select a single column from each file and then join them into a single
> table.
>
>
>
> The files have a row unique id. However it is a very long string. The data
> file with just the name and column of interest is about 470 M. The column
> of interest alone is 21 m. it is a column over 5 million real numbers.
>
>
>
> So I thought I would save a lot of memory if I can join over row numbers.
>
>
>
> # create *dummy* variable to *orderby*
> https://www.py4u.net/discuss/1840945
>
>             w = Window().orderBy(lit('A'))
>
>             sampleDF = sampleDF.select( ["NumReads"] )\
>
>                         .withColumnRenamed( "NumReads", sampleName )\
>
>                         .withColumn( "*tid*",row_number().over(w) )
>
>
>
>
>
> This code seem pretty complicated as someone coming from pandas an R
> dataframes. My unit test works however it generates the following warning.
>
>
>
>
>
> WARN WindowExec: No Partition Defined for Window operation! Moving all
> data to a single partition, this can cause serious performance degradation.
>
>
>
>
>
> Is there a better  way to create a row number with our reordering my data?
> The order is important
>
>
>
> Kind regards
>
>
>
> Andy
>