You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andrew Davidson <ae...@ucsc.edu.INVALID> on 2022/01/23 21:18:14 UTC

What are your experiences using google cloud platform

Hi recently started using GCP dataproc spark.

Seem to have trouble getting big jobs to complete. I am using check points. I am wondering if maybe I should look for another cloud solution

Kind regards

Andy

Re: What are your experiences using google cloud platform

Posted by Mich Talebzadeh <mi...@gmail.com>.
OK,

What configuration do you have for Dataproc master and worker nodes, what
machine types are they?

What storage have you allocated for each? Have you specified the Cloud
Storage staging bucket?

Have you considered autoscaling?

https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling#enable_autoscaling_on_an_existing_cluster

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 24 Jan 2022 at 18:47, Andrew Davidson <ae...@ucsc.edu> wrote:

> I think my problem has to do with mega-mem machine. It was hard to get
> quota for mega-mem machines.  I wonder if they are unstable? Any
> suggestions for how I look at the ‘hardware’?
>
>
>
> I ran the same job several times. They all failed in different ways. Once
> looked like sort of networking problem accessing gcp buckets
>
>
>
> Several times it looked like my jobs fail when I call df.checkpoint()
> basically no progress in my driver log files after 30 mins.  Cpu
> utilization crashes from 60 % to  almost zero. I terminated the jobs.
>
>
>
> One time the checkpoint seemed to hang after doing a series of narrow
> transformations on a single data frame
>
>
>
> Most of the time the checkpoint seem to fail while calculate rowSums, I
> have reworked the rowSum code several times. See bellow for final versoin
>
>
>
> Based on google searches it seem like in gcp dataproc, people set the
> checkpointdir to be something like gs://myBucket/checkpoint/
>
>
>
> I see the cluster has a lot of HDFSstorage. As my job runs memory
> utilization == 100%. My cluster has 2.8 Tb of memory. Spark will eventually
> start  writing something to HDFS. As a newbie I would think we would want
> to set the checkpointdir to HDFS. I do not think HDFS is the limiting
> resource. It never seems to be fully exhausted. I did a lot of googling and
> was unable find an HDFS example URL. The checkpoints() are really slow.
> Takes twice as long as when I call cache().
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
> ###############################################################################
>
>     def rowSums( self, countsSparkDF, columnNames, columnBatchSize ):
>
>         '''
>
>         The GTEx training data set has 10409 numeric columns. This cause a
>
>         java.lang.StackOverflowError because the DAG is to big. increasing
> spark driver
>
>         memory does not help. The work around is sum  smaller batches of
> columns
>
>         and cache the results of each batch
>
>         '''
>
>         self.logger.warn("rowSums BEGIN")
>
>         totalColName = "rowSum"
>
>         for i in range(0, len(columnNames), columnBatchSize) :
>
>             tmpColName = "tmpSum" + str(i)
>
>             batch = columnNames[i:i+columnBatchSize]
>
>             countsSparkDF = self.rowSumsImpl(countsSparkDF, tmpColName,
> batch)
>
>
>
>             if i == 0:
>
>                 countsSparkDF =
> countsSparkDF.withColumnRenamed(tmpColName, totalColName)
>
>
>
>             else:
>
>                 # calculate rolling total
>
>                 countsSparkDF = countsSparkDF.withColumn(totalColName,
> col(totalColName) + col(tmpColName))
>
>                 # save space
>
>                 countsSparkDF = countsSparkDF.drop(tmpColName
> )
>
>
>
>             # use an action to force execution
>
>             numRows = countsSparkDF.count()
>
>             self.logger.warn("rowSums:batch:{} numRows:{}".format(i,
> numRows))
>
>
>
>             # check point will save the df data but not its linage
>
>             #countsSparkDF.cache()
>
>             countsSparkDF.checkpoint()
>
>
>
>         self.logger.warn("rowSums END")
>
>         return countsSparkDF
>
>
>
>
>     ###############################################################################
>
>     def rowSumsImpl( self, countsSparkDF, newColName, columnNames ):
>
>         '''
>
>         calculates actual sum of columns
>
>
>
>         arguments
>
>             countSparkDF
>
>
>
>             newColumName:
>
>                 results from column sum will be sorted here
>
>
>
>             columnNames:
>
>                 list of columns to sum
>
>
>
>         returns
>
>             amended countSparkDF
>
>         '''
>
>         self.logger.warn( "rowSumsImpl BEGIN" )
>
>
>
>         # https://stackoverflow.com/a/54283997/4586180
>
>         retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName ,
> reduce( add, [col( x ) for x in columnNames] ) )
>
>
>
>         self.logger.warn( "rowSumsImpl END\n" )
>
>         return retDF
>
>
>
>
>
>
>
> *From: *Mich Talebzadeh <mi...@gmail.com>
> *Date: *Monday, January 24, 2022 at 12:54 AM
> *To: *Andrew Davidson <ae...@ucsc.edu.invalid>
> *Cc: *"user @spark" <us...@spark.apache.org>
> *Subject: *Re: What are your experiences using google cloud platform
>
>
>
> Dataproc works fine. The current version is Spark 3.1.2. Look at your
> code,  hardware and scaling.
>
>
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Sun, 23 Jan 2022 at 21:19, Andrew Davidson <ae...@ucsc.edu.invalid>
> wrote:
>
> Hi recently started using GCP dataproc spark.
>
>
>
> Seem to have trouble getting big jobs to complete. I am using check
> points. I am wondering if maybe I should look for another cloud solution
>
>
>
> Kind regards
>
>
>
> Andy
>
>

Re: What are your experiences using google cloud platform

Posted by Andrew Davidson <ae...@ucsc.edu.INVALID>.
I think my problem has to do with mega-mem machine. It was hard to get quota for mega-mem machines.  I wonder if they are unstable? Any suggestions for how I look at the ‘hardware’?

I ran the same job several times. They all failed in different ways. Once looked like sort of networking problem accessing gcp buckets

Several times it looked like my jobs fail when I call df.checkpoint() basically no progress in my driver log files after 30 mins.  Cpu utilization crashes from 60 % to  almost zero. I terminated the jobs.

One time the checkpoint seemed to hang after doing a series of narrow transformations on a single data frame

Most of the time the checkpoint seem to fail while calculate rowSums, I have reworked the rowSum code several times. See bellow for final versoin

Based on google searches it seem like in gcp dataproc, people set the checkpointdir to be something like gs://myBucket/checkpoint/

I see the cluster has a lot of HDFSstorage. As my job runs memory utilization == 100%. My cluster has 2.8 Tb of memory. Spark will eventually start  writing something to HDFS. As a newbie I would think we would want to set the checkpointdir to HDFS. I do not think HDFS is the limiting resource. It never seems to be fully exhausted. I did a lot of googling and was unable find an HDFS example URL. The checkpoints() are really slow. Takes twice as long as when I call cache().

Comments and suggestions appreciated

Andy

###############################################################################
    def rowSums( self, countsSparkDF, columnNames, columnBatchSize ):
        '''
        The GTEx training data set has 10409 numeric columns. This cause a
        java.lang.StackOverflowError because the DAG is to big. increasing spark driver
        memory does not help. The work around is sum  smaller batches of columns
        and cache the results of each batch
        '''
        self.logger.warn("rowSums BEGIN")
        totalColName = "rowSum"
        for i in range(0, len(columnNames), columnBatchSize) :
            tmpColName = "tmpSum" + str(i)
            batch = columnNames[i:i+columnBatchSize]
            countsSparkDF = self.rowSumsImpl(countsSparkDF, tmpColName, batch)

            if i == 0:
                countsSparkDF = countsSparkDF.withColumnRenamed(tmpColName, totalColName)

            else:
                # calculate rolling total
                countsSparkDF = countsSparkDF.withColumn(totalColName, col(totalColName) + col(tmpColName))
                # save space
                countsSparkDF = countsSparkDF.drop(tmpColName )

            # use an action to force execution
            numRows = countsSparkDF.count()
            self.logger.warn("rowSums:batch:{} numRows:{}".format(i, numRows))

            # check point will save the df data but not its linage
            #countsSparkDF.cache()
            countsSparkDF.checkpoint()

        self.logger.warn("rowSums END")
        return countsSparkDF

    ###############################################################################
    def rowSumsImpl( self, countsSparkDF, newColName, columnNames ):
        '''
        calculates actual sum of columns

        arguments
            countSparkDF

            newColumName:
                results from column sum will be sorted here

            columnNames:
                list of columns to sum

        returns
            amended countSparkDF
        '''
        self.logger.warn( "rowSumsImpl BEGIN" )

        # https://stackoverflow.com/a/54283997/4586180
        retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName , reduce( add, [col( x ) for x in columnNames] ) )

        self.logger.warn( "rowSumsImpl END\n" )
        return retDF



From: Mich Talebzadeh <mi...@gmail.com>
Date: Monday, January 24, 2022 at 12:54 AM
To: Andrew Davidson <ae...@ucsc.edu.invalid>
Cc: "user @spark" <us...@spark.apache.org>
Subject: Re: What are your experiences using google cloud platform

Dataproc works fine. The current version is Spark 3.1.2. Look at your code,  hardware and scaling.



HTH


 [https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]   view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Sun, 23 Jan 2022 at 21:19, Andrew Davidson <ae...@ucsc.edu.invalid> wrote:
Hi recently started using GCP dataproc spark.

Seem to have trouble getting big jobs to complete. I am using check points. I am wondering if maybe I should look for another cloud solution

Kind regards

Andy

Re: What are your experiences using google cloud platform

Posted by Mich Talebzadeh <mi...@gmail.com>.
Dataproc works fine. The current version is Spark 3.1.2. Look at your
code,  hardware and scaling.


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 23 Jan 2022 at 21:19, Andrew Davidson <ae...@ucsc.edu.invalid>
wrote:

> Hi recently started using GCP dataproc spark.
>
>
>
> Seem to have trouble getting big jobs to complete. I am using check
> points. I am wondering if maybe I should look for another cloud solution
>
>
>
> Kind regards
>
>
>
> Andy
>

Re: What are your experiences using google cloud platform

Posted by German Schiavon <gs...@gmail.com>.
Hi,

Changing cloud providers won't help if your job is slow, has skew, etc... I
think first you have to see why "big jobs" are not completing.


On Sun, 23 Jan 2022 at 22:18, Andrew Davidson <ae...@ucsc.edu.invalid>
wrote:

> Hi recently started using GCP dataproc spark.
>
>
>
> Seem to have trouble getting big jobs to complete. I am using check
> points. I am wondering if maybe I should look for another cloud solution
>
>
>
> Kind regards
>
>
>
> Andy
>