You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sidney Feiner <si...@startapp.com> on 2017/07/04 12:15:44 UTC

RE: [PySpark] - running processes and computing time

To initialize it per executor, I used a class with only class attibutes and class methods (like an `object` in Scala), but because  I was using PySpark, it was actually being initiated per process ☹
What I went for was the broadcast variable but there still is something suspicious with my application – the processing time of each batch.

In my logs, I see that when I process a partition, it takes under a second. But in the Spark UI I see that a task takes between 3 and 6 seconds.
Shouldn't the partition process time and the task computing time be the same?

My code:


def process_func (obj, records):
    start = time()
    processed_records = # Some processing
    logger.info("It took {0} seconds to handle records".format(time() - start, events_amount))  # This logs very small numbers (around 0.05 seoonds)
    return analyzed_events

def handle_rdd(rdd: RDD):
    start_time = time.time()
    rdd.foreachPartition(lambda records: process_func(object_broadcast.value, records))
    logger.info("Handle RDD took: {0} seconds".format(time.time() - start_time))  # This logs much bigger numbers (around 3-6 seconds)

ssc.union(*streams)\
    .filter(lambda x: x[1] is not None)\
    .map(lambda x: x[1])\
    .foreachRDD(handle_rdd)  # Keep only values and cast them to TextAnalysis
ssc.start()
ssc.awaitTermination()


each RDD has at most 10 partitions which means that it should take around 0.5 seconds for all the tasks to be processed.

Does anyone know what happens here? The time difference is too big for it to be networking right?

From: Sudev A C [mailto:sudev.ac@go-mmt.com]
Sent: Monday, July 3, 2017 7:48 PM
To: Sidney Feiner <si...@startapp.com>; user@spark.apache.org
Subject: Re: [PySpark] - running processes

You might want to do the initialisation per partition (Not sure how you are achieving the per executor initialisation in your code ).

To initialise something for per partition, you may use something like rdd.forEach partition.

Or if you want something globally like a variable for further processing you might want to initialise it once as a broadcast variable and use access the data structure through broadcast variable.

Afaik python process will be initiated for per partition tasks.
On Mon, 3 Jul 2017 at 5:23 PM, Sidney Feiner <si...@startapp.com>> wrote:

In my Spark Streaming application, I have the need to build a graph from a file and initializing that graph takes between 5 and 10 seconds.

So I tried initializing it once per executor so it'll be initialized only once.

After running the application, I've noticed that it's initiated much more than once per executor, every time with a different process id (every process has it's own logger).

Doesn't every executor have it's own JVM and it's own process? Or is that only relevant when I develop in JVM languages like Scala/Java? Do executors in PySpark spawn new processes for new tasks?

And if they do, how can I make sure that my graph object will really only be initiated once?
Thanks :)


Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




::DISCLAIMER::

----------------------------------------------------------------------------------------------------------------------------------------------------



This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.