You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mike Sukmanowsky <mi...@gmail.com> on 2015/08/20 15:34:31 UTC

PySpark concurrent jobs using single SparkContext

Hi all,

We're using Spark 1.3.0 via a small YARN cluster to do some log processing.
The jobs are pretty simple, for a number of customers and a number of days,
fetch some event log data, build aggregates and store those aggregates into
a data store.

The way our script is written right now does something akin to:

with SparkContext() as sc:
    for customer in customers:
        for day in days:
            logs = sc.textFile(get_logs(customer, day))
            aggregate = make_aggregate(logs)
            # This function contains the action saveAsNewAPIHadoopFile which
            # triggers a save
            save_aggregate(aggregate)

​
So we have a Spark job per customer, per day.

I tried doing some parallel job submission with something similar to:

def make_and_save_aggregate(customer, day, spark_context):
    # Without a separate threading.Lock() here or better yet, one guarding the
    # Spark context, multiple customer/day transformations and actions could
    # be interweaved
    sc = spark_context
    logs = sc.textFile(get_logs(customer, day))
    aggregate = make_aggregate(logs)
    save_aggregate(aggregate)
with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor:
    for customer in customers:
        for day in days:
            executor.submit(make_and_save_aggregate, customer, day, sc)

​
The problem is, with no locks on a SparkContext except during initialization
<https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L214-L241>
and
shutdown
<https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L296-L307>,
operations on the context could (if I understand correctly) be interweaved
leading to DAG which contains transformations out of order and from
different customer, day periods.

One solution is instead to launch multiple Spark jobs via spark-submit and
let YARN/Spark's dynamic executor allocation take care of fair scheduling.
In practice, this doesn't seem to yield very fast computation perhaps due
to some additional overhead with YARN.

Is there any safe way to launch concurrent jobs like this using a single
PySpark context?

-- 
Mike Sukmanowsky
Aspiring Digital Carpenter

*e*: mike.sukmanowsky@gmail.com

LinkedIn <http://www.linkedin.com/profile/view?id=10897143> | github
<https://github.com/msukmanowsky>

Re: PySpark concurrent jobs using single SparkContext

Posted by Hemant Bhanawat <he...@gmail.com>.
It seems like you want simultaneous processing of multiple jobs but at the
same time serialization of few tasks within those jobs. I don't know how to
achieve that in Spark.

But, why would you bother about the inter-weaved processing when the data
that is being aggregated in different jobs is per customer per day? Is it
that save_aggregate depends on results of other customers and/or other
days?

I also don't understand how you would achieve that with yarn because
interweaving of tasks of separately submitted jobs may happen with dynamic
executor allocation as well.

Hemant


On Thu, Aug 20, 2015 at 7:04 PM, Mike Sukmanowsky <
mike.sukmanowsky@gmail.com> wrote:

> Hi all,
>
> We're using Spark 1.3.0 via a small YARN cluster to do some log
> processing. The jobs are pretty simple, for a number of customers and a
> number of days, fetch some event log data, build aggregates and store those
> aggregates into a data store.
>
> The way our script is written right now does something akin to:
>
> with SparkContext() as sc:
>     for customer in customers:
>         for day in days:
>             logs = sc.textFile(get_logs(customer, day))
>             aggregate = make_aggregate(logs)
>             # This function contains the action saveAsNewAPIHadoopFile which
>             # triggers a save
>             save_aggregate(aggregate)
>
> ​
> So we have a Spark job per customer, per day.
>
> I tried doing some parallel job submission with something similar to:
>
> def make_and_save_aggregate(customer, day, spark_context):
>     # Without a separate threading.Lock() here or better yet, one guarding the
>     # Spark context, multiple customer/day transformations and actions could
>     # be interweaved
>     sc = spark_context
>     logs = sc.textFile(get_logs(customer, day))
>     aggregate = make_aggregate(logs)
>     save_aggregate(aggregate)
> with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor:
>     for customer in customers:
>         for day in days:
>             executor.submit(make_and_save_aggregate, customer, day, sc)
>
> ​
> The problem is, with no locks on a SparkContext except during
> initialization
> <https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L214-L241> and
> shutdown
> <https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L296-L307>,
> operations on the context could (if I understand correctly) be interweaved
> leading to DAG which contains transformations out of order and from
> different customer, day periods.
>
> One solution is instead to launch multiple Spark jobs via spark-submit and
> let YARN/Spark's dynamic executor allocation take care of fair scheduling.
> In practice, this doesn't seem to yield very fast computation perhaps due
> to some additional overhead with YARN.
>
> Is there any safe way to launch concurrent jobs like this using a single
> PySpark context?
>
> --
> Mike Sukmanowsky
> Aspiring Digital Carpenter
>
> *e*: mike.sukmanowsky@gmail.com
>
> LinkedIn <http://www.linkedin.com/profile/view?id=10897143> | github
> <https://github.com/msukmanowsky>
>
>