You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Boris <bo...@gmail.com> on 2017/10/13 13:54:53 UTC

Return results optionally from spark_sql_hook

hi guys,

I opened JIRA on this and will be working on PR
https://issues.apache.org/jira/browse/AIRFLOW-1713

any objections/suggestions conceptually?

Fokko, I see you have been actively contributing to spark hooks and
operators so I could use your opinion before I implement this.

Boris

Re: Return results optionally from spark_sql_hook

Posted by Boris Tyukin <bo...@boristyukin.com>.
great, this is what I expected to hear but wanted to double check. thanks
for all your help, Fokko

On Mon, Oct 16, 2017 at 1:08 PM, Driesprong, Fokko <fo...@driesprong.frl>
wrote:

> Hi Boris,
>
> When kicking off Spark jobs using Airflow, cluster mode is highly
> recommended since the workload of the driver is on the Hadoop cluster, and
> not on the Airflow machine itself. Personally I prefer the spark-submit
> operator since it will pull all the connection variables directly from
> Airflow, and you'll end up with a central place (Airflow connections) where
> all the configuration is kept. Otherwise you'll end up with configuration
> within your Airflow logic.
>
> Cheers, Fokko
>
> 2017-10-15 17:16 GMT+02:00 Boris <bo...@gmail.com>:
>
> > Thanks Fokko. Do you know if it is better to use pyspark directly within
> > python operator or invoke submit-job instead? My understanding in both
> > cases airflow uses yarn-client deployment mode, not yarn-cluster and
> spark
> > driver always runs on the same node with airflow worker. Not sure it is
> the
> > best practice...
> >
> > On Oct 15, 2017 05:04, "Driesprong, Fokko" <fo...@driesprong.frl> wrote:
> >
> > > Hi Boris,
> > >
> > > Instead of writing it to a file, you can also write it to xcom, this
> will
> > > keep everything inside of Airflow. My personal opinion on this;
> spark-sql
> > > is a bit limited by nature, it only support SQL. If you want to do more
> > > dynamic stuff, you will eventually have to move to spark-submit anyway.
> > >
> > > Cheers, Fokko
> > >
> > > 2017-10-14 14:45 GMT+02:00 Boris <bo...@gmail.com>:
> > >
> > > > Thanks Fokko, I think it will do it but my concern that in this case
> my
> > > dag
> > > > will initiate two separate spark sessions and it takes about 20
> seconds
> > > in
> > > > our yarn environment to create it. I need to run 600 dags like that
> > every
> > > > morning.
> > > >
> > > > I am thinking now to create a pyspark job that will do insert and
> count
> > > and
> > > > write it to a temp file. Still not ideal... I wish I could just parse
> > > spark
> > > > SQL instead..
> > > >
> > > > On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <fo...@driesprong.frl>
> > > wrote:
> > > >
> > > > > Hi Boris,
> > > > >
> > > > > That sounds like a nice DAG.
> > > > >
> > > > > This is how I would do it: First run the long running query in a
> > > > spark-sql
> > > > > operator like you have now. Create a python function that builds a
> > > > > SparkSession within Python (using the Spark pyspark api) and
> fetches
> > > the
> > > > > count from the spark partition that you've just created. Create a
> > > > > BranchPythonOperator that will invoke this function, and based on,
> if
> > > the
> > > > > count is ok or not, branch:
> > > > >
> > > > >    - If the count is okay, branch downstream and continue with the
> > > normal
> > > > >    execution.
> > > > >    - If the count is off, terminate and send you and email/slack
> that
> > > the
> > > > >    count is not as expected.
> > > > >
> > > > > ​This will look something like this:
> > > > > [image: Inline afbeelding 1]​
> > > > >
> > > > > Would this solve your problem?
> > > > >
> > > > > Cheers, Fokko
> > > > >
> > > > >
> > > > >
> > > > > 2017-10-14 13:42 GMT+02:00 Boris Tyukin <bo...@boristyukin.com>:
> > > > >
> > > > >> Hi Fokko, thanks for your response, really appreciate it!
> > > > >>
> > > > >> Basically in my case I have two Spark SQL queries:
> > > > >>
> > > > >> 1) the first query does INSERT OVERWRITE to a partition and may
> > take a
> > > > >> while for a while
> > > > >> 2) then I run a second query right after it to get count of rows
> of
> > > that
> > > > >> partition.
> > > > >> 3) I need to pass that count back to airflow dag and this count
> will
> > > be
> > > > >> used by the next task in the DAG to make a decision if this
> > partition
> > > > >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION)
> > > with a
> > > > >> production table partition.
> > > > >>
> > > > >> So I need somehow to get that count of rows. My initial though was
> > to
> > > > >> parse
> > > > >> the log and extract that count but looks like even if i do regex
> it
> > > does
> > > > >> not quite work - spark sql writes query output to stdout which
> > airflow
> > > > >> spark sql hook does not capture right now.
> > > > >>
> > > > >> if you can suggest a better solution for me it would be great!
> > > > >>
> > > > >> Also initially I wanted to count rows and then do ALTER TABLE
> > EXCHANGE
> > > > >> PARTITION in the same pyspark job but I found out that spark does
> > not
> > > > >> support this statement yet and I have to use Hive.
> > > > >>
> > > > >> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko
> > > <fokko@driesprong.frl
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >> > Hi Boris,
> > > > >> >
> > > > >> > Thank you for your question and excuse me for the late response,
> > > > >> currently
> > > > >> > I'm on holiday.
> > > > >> >
> > > > >> > The solution that you suggest, would not be my preferred choice.
> > > > >> Extracting
> > > > >> > results from a log using a regex is expensive in terms of
> > > > computational
> > > > >> > costs, and error prone. My question is, what are you trying to
> > > > >> accomplish?
> > > > >> > For me there are two ways of using the Spark-sql operator:
> > > > >> >
> > > > >> >    1. ETL Using Spark: Instead of returning the results, write
> the
> > > > >> results
> > > > >> >    back to a new table, or a new partition within the table.
> This
> > > data
> > > > >> can
> > > > >> > be
> > > > >> >    used downstream in the dag. Also, this will write the data to
> > > hdfs
> > > > >> > which is
> > > > >> >    nice for persistance.
> > > > >> >    2. Write the data in a simple and widely supported format
> (such
> > > as
> > > > >> csv)
> > > > >> >    onto hdfs. Now you can get the data from hdfs using `hdfs dfs
> > > -get`
> > > > >> to
> > > > >> > you
> > > > >> >    local file-system. Or use `hdfs dfs -cat ... |
> application.py`
> > to
> > > > >> pipe
> > > > >> > it
> > > > >> >    to your application directly.
> > > > >> >
> > > > >> > What you are trying to accomplish, looks for me something that
> > would
> > > > fit
> > > > >> > the spark-submit job, where you can submit pyspark applications
> > > where
> > > > >> you
> > > > >> > can directly fetch the results from Spark:
> > > > >> >
> > > > >> > Welcome to
> > > > >> >       ____              __
> > > > >> >      / __/__  ___ _____/ /__
> > > > >> >     _\ \/ _ \/ _ `/ __/  '_/
> > > > >> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
> > > > >> >       /_/
> > > > >> >
> > > > >> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> > > > >> > SparkSession available as 'spark'.
> > > > >> > >>> spark.sql("SELECT 1 as count").first()
> > > > >> > Row(count=1)
> > > > >> >
> > > > >> > Most of the time we use the Spark-sql to transform the data,
> then
> > > use
> > > > >> sqoop
> > > > >> > to get the data from hdfs to a rdbms to expose the data to the
> > > > business.
> > > > >> > These examples are for Spark using hdfs, but for s3 it is
> somewhat
> > > the
> > > > >> > same.
> > > > >> >
> > > > >> > Does this answer your question, if not, could you elaborate the
> > > > problem
> > > > >> > that you are facing?
> > > > >> >
> > > > >> > Ciao, Fokko
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > 2017-10-13 15:54 GMT+02:00 Boris <bo...@gmail.com>:
> > > > >> >
> > > > >> > > hi guys,
> > > > >> > >
> > > > >> > > I opened JIRA on this and will be working on PR
> > > > >> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
> > > > >> > >
> > > > >> > > any objections/suggestions conceptually?
> > > > >> > >
> > > > >> > > Fokko, I see you have been actively contributing to spark
> hooks
> > > and
> > > > >> > > operators so I could use your opinion before I implement this.
> > > > >> > >
> > > > >> > > Boris
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Return results optionally from spark_sql_hook

Posted by "Driesprong, Fokko" <fo...@driesprong.frl>.
Hi Boris,

When kicking off Spark jobs using Airflow, cluster mode is highly
recommended since the workload of the driver is on the Hadoop cluster, and
not on the Airflow machine itself. Personally I prefer the spark-submit
operator since it will pull all the connection variables directly from
Airflow, and you'll end up with a central place (Airflow connections) where
all the configuration is kept. Otherwise you'll end up with configuration
within your Airflow logic.

Cheers, Fokko

2017-10-15 17:16 GMT+02:00 Boris <bo...@gmail.com>:

> Thanks Fokko. Do you know if it is better to use pyspark directly within
> python operator or invoke submit-job instead? My understanding in both
> cases airflow uses yarn-client deployment mode, not yarn-cluster and spark
> driver always runs on the same node with airflow worker. Not sure it is the
> best practice...
>
> On Oct 15, 2017 05:04, "Driesprong, Fokko" <fo...@driesprong.frl> wrote:
>
> > Hi Boris,
> >
> > Instead of writing it to a file, you can also write it to xcom, this will
> > keep everything inside of Airflow. My personal opinion on this; spark-sql
> > is a bit limited by nature, it only support SQL. If you want to do more
> > dynamic stuff, you will eventually have to move to spark-submit anyway.
> >
> > Cheers, Fokko
> >
> > 2017-10-14 14:45 GMT+02:00 Boris <bo...@gmail.com>:
> >
> > > Thanks Fokko, I think it will do it but my concern that in this case my
> > dag
> > > will initiate two separate spark sessions and it takes about 20 seconds
> > in
> > > our yarn environment to create it. I need to run 600 dags like that
> every
> > > morning.
> > >
> > > I am thinking now to create a pyspark job that will do insert and count
> > and
> > > write it to a temp file. Still not ideal... I wish I could just parse
> > spark
> > > SQL instead..
> > >
> > > On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <fo...@driesprong.frl>
> > wrote:
> > >
> > > > Hi Boris,
> > > >
> > > > That sounds like a nice DAG.
> > > >
> > > > This is how I would do it: First run the long running query in a
> > > spark-sql
> > > > operator like you have now. Create a python function that builds a
> > > > SparkSession within Python (using the Spark pyspark api) and fetches
> > the
> > > > count from the spark partition that you've just created. Create a
> > > > BranchPythonOperator that will invoke this function, and based on, if
> > the
> > > > count is ok or not, branch:
> > > >
> > > >    - If the count is okay, branch downstream and continue with the
> > normal
> > > >    execution.
> > > >    - If the count is off, terminate and send you and email/slack that
> > the
> > > >    count is not as expected.
> > > >
> > > > ​This will look something like this:
> > > > [image: Inline afbeelding 1]​
> > > >
> > > > Would this solve your problem?
> > > >
> > > > Cheers, Fokko
> > > >
> > > >
> > > >
> > > > 2017-10-14 13:42 GMT+02:00 Boris Tyukin <bo...@boristyukin.com>:
> > > >
> > > >> Hi Fokko, thanks for your response, really appreciate it!
> > > >>
> > > >> Basically in my case I have two Spark SQL queries:
> > > >>
> > > >> 1) the first query does INSERT OVERWRITE to a partition and may
> take a
> > > >> while for a while
> > > >> 2) then I run a second query right after it to get count of rows of
> > that
> > > >> partition.
> > > >> 3) I need to pass that count back to airflow dag and this count will
> > be
> > > >> used by the next task in the DAG to make a decision if this
> partition
> > > >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION)
> > with a
> > > >> production table partition.
> > > >>
> > > >> So I need somehow to get that count of rows. My initial though was
> to
> > > >> parse
> > > >> the log and extract that count but looks like even if i do regex it
> > does
> > > >> not quite work - spark sql writes query output to stdout which
> airflow
> > > >> spark sql hook does not capture right now.
> > > >>
> > > >> if you can suggest a better solution for me it would be great!
> > > >>
> > > >> Also initially I wanted to count rows and then do ALTER TABLE
> EXCHANGE
> > > >> PARTITION in the same pyspark job but I found out that spark does
> not
> > > >> support this statement yet and I have to use Hive.
> > > >>
> > > >> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko
> > <fokko@driesprong.frl
> > > >
> > > >> wrote:
> > > >>
> > > >> > Hi Boris,
> > > >> >
> > > >> > Thank you for your question and excuse me for the late response,
> > > >> currently
> > > >> > I'm on holiday.
> > > >> >
> > > >> > The solution that you suggest, would not be my preferred choice.
> > > >> Extracting
> > > >> > results from a log using a regex is expensive in terms of
> > > computational
> > > >> > costs, and error prone. My question is, what are you trying to
> > > >> accomplish?
> > > >> > For me there are two ways of using the Spark-sql operator:
> > > >> >
> > > >> >    1. ETL Using Spark: Instead of returning the results, write the
> > > >> results
> > > >> >    back to a new table, or a new partition within the table. This
> > data
> > > >> can
> > > >> > be
> > > >> >    used downstream in the dag. Also, this will write the data to
> > hdfs
> > > >> > which is
> > > >> >    nice for persistance.
> > > >> >    2. Write the data in a simple and widely supported format (such
> > as
> > > >> csv)
> > > >> >    onto hdfs. Now you can get the data from hdfs using `hdfs dfs
> > -get`
> > > >> to
> > > >> > you
> > > >> >    local file-system. Or use `hdfs dfs -cat ... | application.py`
> to
> > > >> pipe
> > > >> > it
> > > >> >    to your application directly.
> > > >> >
> > > >> > What you are trying to accomplish, looks for me something that
> would
> > > fit
> > > >> > the spark-submit job, where you can submit pyspark applications
> > where
> > > >> you
> > > >> > can directly fetch the results from Spark:
> > > >> >
> > > >> > Welcome to
> > > >> >       ____              __
> > > >> >      / __/__  ___ _____/ /__
> > > >> >     _\ \/ _ \/ _ `/ __/  '_/
> > > >> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
> > > >> >       /_/
> > > >> >
> > > >> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> > > >> > SparkSession available as 'spark'.
> > > >> > >>> spark.sql("SELECT 1 as count").first()
> > > >> > Row(count=1)
> > > >> >
> > > >> > Most of the time we use the Spark-sql to transform the data, then
> > use
> > > >> sqoop
> > > >> > to get the data from hdfs to a rdbms to expose the data to the
> > > business.
> > > >> > These examples are for Spark using hdfs, but for s3 it is somewhat
> > the
> > > >> > same.
> > > >> >
> > > >> > Does this answer your question, if not, could you elaborate the
> > > problem
> > > >> > that you are facing?
> > > >> >
> > > >> > Ciao, Fokko
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > 2017-10-13 15:54 GMT+02:00 Boris <bo...@gmail.com>:
> > > >> >
> > > >> > > hi guys,
> > > >> > >
> > > >> > > I opened JIRA on this and will be working on PR
> > > >> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
> > > >> > >
> > > >> > > any objections/suggestions conceptually?
> > > >> > >
> > > >> > > Fokko, I see you have been actively contributing to spark hooks
> > and
> > > >> > > operators so I could use your opinion before I implement this.
> > > >> > >
> > > >> > > Boris
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Re: Return results optionally from spark_sql_hook

Posted by Boris <bo...@gmail.com>.
Thanks Fokko. Do you know if it is better to use pyspark directly within
python operator or invoke submit-job instead? My understanding in both
cases airflow uses yarn-client deployment mode, not yarn-cluster and spark
driver always runs on the same node with airflow worker. Not sure it is the
best practice...

On Oct 15, 2017 05:04, "Driesprong, Fokko" <fo...@driesprong.frl> wrote:

> Hi Boris,
>
> Instead of writing it to a file, you can also write it to xcom, this will
> keep everything inside of Airflow. My personal opinion on this; spark-sql
> is a bit limited by nature, it only support SQL. If you want to do more
> dynamic stuff, you will eventually have to move to spark-submit anyway.
>
> Cheers, Fokko
>
> 2017-10-14 14:45 GMT+02:00 Boris <bo...@gmail.com>:
>
> > Thanks Fokko, I think it will do it but my concern that in this case my
> dag
> > will initiate two separate spark sessions and it takes about 20 seconds
> in
> > our yarn environment to create it. I need to run 600 dags like that every
> > morning.
> >
> > I am thinking now to create a pyspark job that will do insert and count
> and
> > write it to a temp file. Still not ideal... I wish I could just parse
> spark
> > SQL instead..
> >
> > On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <fo...@driesprong.frl>
> wrote:
> >
> > > Hi Boris,
> > >
> > > That sounds like a nice DAG.
> > >
> > > This is how I would do it: First run the long running query in a
> > spark-sql
> > > operator like you have now. Create a python function that builds a
> > > SparkSession within Python (using the Spark pyspark api) and fetches
> the
> > > count from the spark partition that you've just created. Create a
> > > BranchPythonOperator that will invoke this function, and based on, if
> the
> > > count is ok or not, branch:
> > >
> > >    - If the count is okay, branch downstream and continue with the
> normal
> > >    execution.
> > >    - If the count is off, terminate and send you and email/slack that
> the
> > >    count is not as expected.
> > >
> > > ​This will look something like this:
> > > [image: Inline afbeelding 1]​
> > >
> > > Would this solve your problem?
> > >
> > > Cheers, Fokko
> > >
> > >
> > >
> > > 2017-10-14 13:42 GMT+02:00 Boris Tyukin <bo...@boristyukin.com>:
> > >
> > >> Hi Fokko, thanks for your response, really appreciate it!
> > >>
> > >> Basically in my case I have two Spark SQL queries:
> > >>
> > >> 1) the first query does INSERT OVERWRITE to a partition and may take a
> > >> while for a while
> > >> 2) then I run a second query right after it to get count of rows of
> that
> > >> partition.
> > >> 3) I need to pass that count back to airflow dag and this count will
> be
> > >> used by the next task in the DAG to make a decision if this partition
> > >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION)
> with a
> > >> production table partition.
> > >>
> > >> So I need somehow to get that count of rows. My initial though was to
> > >> parse
> > >> the log and extract that count but looks like even if i do regex it
> does
> > >> not quite work - spark sql writes query output to stdout which airflow
> > >> spark sql hook does not capture right now.
> > >>
> > >> if you can suggest a better solution for me it would be great!
> > >>
> > >> Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE
> > >> PARTITION in the same pyspark job but I found out that spark does not
> > >> support this statement yet and I have to use Hive.
> > >>
> > >> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko
> <fokko@driesprong.frl
> > >
> > >> wrote:
> > >>
> > >> > Hi Boris,
> > >> >
> > >> > Thank you for your question and excuse me for the late response,
> > >> currently
> > >> > I'm on holiday.
> > >> >
> > >> > The solution that you suggest, would not be my preferred choice.
> > >> Extracting
> > >> > results from a log using a regex is expensive in terms of
> > computational
> > >> > costs, and error prone. My question is, what are you trying to
> > >> accomplish?
> > >> > For me there are two ways of using the Spark-sql operator:
> > >> >
> > >> >    1. ETL Using Spark: Instead of returning the results, write the
> > >> results
> > >> >    back to a new table, or a new partition within the table. This
> data
> > >> can
> > >> > be
> > >> >    used downstream in the dag. Also, this will write the data to
> hdfs
> > >> > which is
> > >> >    nice for persistance.
> > >> >    2. Write the data in a simple and widely supported format (such
> as
> > >> csv)
> > >> >    onto hdfs. Now you can get the data from hdfs using `hdfs dfs
> -get`
> > >> to
> > >> > you
> > >> >    local file-system. Or use `hdfs dfs -cat ... | application.py` to
> > >> pipe
> > >> > it
> > >> >    to your application directly.
> > >> >
> > >> > What you are trying to accomplish, looks for me something that would
> > fit
> > >> > the spark-submit job, where you can submit pyspark applications
> where
> > >> you
> > >> > can directly fetch the results from Spark:
> > >> >
> > >> > Welcome to
> > >> >       ____              __
> > >> >      / __/__  ___ _____/ /__
> > >> >     _\ \/ _ \/ _ `/ __/  '_/
> > >> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
> > >> >       /_/
> > >> >
> > >> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> > >> > SparkSession available as 'spark'.
> > >> > >>> spark.sql("SELECT 1 as count").first()
> > >> > Row(count=1)
> > >> >
> > >> > Most of the time we use the Spark-sql to transform the data, then
> use
> > >> sqoop
> > >> > to get the data from hdfs to a rdbms to expose the data to the
> > business.
> > >> > These examples are for Spark using hdfs, but for s3 it is somewhat
> the
> > >> > same.
> > >> >
> > >> > Does this answer your question, if not, could you elaborate the
> > problem
> > >> > that you are facing?
> > >> >
> > >> > Ciao, Fokko
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > 2017-10-13 15:54 GMT+02:00 Boris <bo...@gmail.com>:
> > >> >
> > >> > > hi guys,
> > >> > >
> > >> > > I opened JIRA on this and will be working on PR
> > >> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
> > >> > >
> > >> > > any objections/suggestions conceptually?
> > >> > >
> > >> > > Fokko, I see you have been actively contributing to spark hooks
> and
> > >> > > operators so I could use your opinion before I implement this.
> > >> > >
> > >> > > Boris
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Re: Return results optionally from spark_sql_hook

Posted by "Driesprong, Fokko" <fo...@driesprong.frl>.
Hi Boris,

Instead of writing it to a file, you can also write it to xcom, this will
keep everything inside of Airflow. My personal opinion on this; spark-sql
is a bit limited by nature, it only support SQL. If you want to do more
dynamic stuff, you will eventually have to move to spark-submit anyway.

Cheers, Fokko

2017-10-14 14:45 GMT+02:00 Boris <bo...@gmail.com>:

> Thanks Fokko, I think it will do it but my concern that in this case my dag
> will initiate two separate spark sessions and it takes about 20 seconds in
> our yarn environment to create it. I need to run 600 dags like that every
> morning.
>
> I am thinking now to create a pyspark job that will do insert and count and
> write it to a temp file. Still not ideal... I wish I could just parse spark
> SQL instead..
>
> On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <fo...@driesprong.frl> wrote:
>
> > Hi Boris,
> >
> > That sounds like a nice DAG.
> >
> > This is how I would do it: First run the long running query in a
> spark-sql
> > operator like you have now. Create a python function that builds a
> > SparkSession within Python (using the Spark pyspark api) and fetches the
> > count from the spark partition that you've just created. Create a
> > BranchPythonOperator that will invoke this function, and based on, if the
> > count is ok or not, branch:
> >
> >    - If the count is okay, branch downstream and continue with the normal
> >    execution.
> >    - If the count is off, terminate and send you and email/slack that the
> >    count is not as expected.
> >
> > ​This will look something like this:
> > [image: Inline afbeelding 1]​
> >
> > Would this solve your problem?
> >
> > Cheers, Fokko
> >
> >
> >
> > 2017-10-14 13:42 GMT+02:00 Boris Tyukin <bo...@boristyukin.com>:
> >
> >> Hi Fokko, thanks for your response, really appreciate it!
> >>
> >> Basically in my case I have two Spark SQL queries:
> >>
> >> 1) the first query does INSERT OVERWRITE to a partition and may take a
> >> while for a while
> >> 2) then I run a second query right after it to get count of rows of that
> >> partition.
> >> 3) I need to pass that count back to airflow dag and this count will be
> >> used by the next task in the DAG to make a decision if this partition
> >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a
> >> production table partition.
> >>
> >> So I need somehow to get that count of rows. My initial though was to
> >> parse
> >> the log and extract that count but looks like even if i do regex it does
> >> not quite work - spark sql writes query output to stdout which airflow
> >> spark sql hook does not capture right now.
> >>
> >> if you can suggest a better solution for me it would be great!
> >>
> >> Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE
> >> PARTITION in the same pyspark job but I found out that spark does not
> >> support this statement yet and I have to use Hive.
> >>
> >> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko <fokko@driesprong.frl
> >
> >> wrote:
> >>
> >> > Hi Boris,
> >> >
> >> > Thank you for your question and excuse me for the late response,
> >> currently
> >> > I'm on holiday.
> >> >
> >> > The solution that you suggest, would not be my preferred choice.
> >> Extracting
> >> > results from a log using a regex is expensive in terms of
> computational
> >> > costs, and error prone. My question is, what are you trying to
> >> accomplish?
> >> > For me there are two ways of using the Spark-sql operator:
> >> >
> >> >    1. ETL Using Spark: Instead of returning the results, write the
> >> results
> >> >    back to a new table, or a new partition within the table. This data
> >> can
> >> > be
> >> >    used downstream in the dag. Also, this will write the data to hdfs
> >> > which is
> >> >    nice for persistance.
> >> >    2. Write the data in a simple and widely supported format (such as
> >> csv)
> >> >    onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get`
> >> to
> >> > you
> >> >    local file-system. Or use `hdfs dfs -cat ... | application.py` to
> >> pipe
> >> > it
> >> >    to your application directly.
> >> >
> >> > What you are trying to accomplish, looks for me something that would
> fit
> >> > the spark-submit job, where you can submit pyspark applications where
> >> you
> >> > can directly fetch the results from Spark:
> >> >
> >> > Welcome to
> >> >       ____              __
> >> >      / __/__  ___ _____/ /__
> >> >     _\ \/ _ \/ _ `/ __/  '_/
> >> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
> >> >       /_/
> >> >
> >> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> >> > SparkSession available as 'spark'.
> >> > >>> spark.sql("SELECT 1 as count").first()
> >> > Row(count=1)
> >> >
> >> > Most of the time we use the Spark-sql to transform the data, then use
> >> sqoop
> >> > to get the data from hdfs to a rdbms to expose the data to the
> business.
> >> > These examples are for Spark using hdfs, but for s3 it is somewhat the
> >> > same.
> >> >
> >> > Does this answer your question, if not, could you elaborate the
> problem
> >> > that you are facing?
> >> >
> >> > Ciao, Fokko
> >> >
> >> >
> >> >
> >> >
> >> > 2017-10-13 15:54 GMT+02:00 Boris <bo...@gmail.com>:
> >> >
> >> > > hi guys,
> >> > >
> >> > > I opened JIRA on this and will be working on PR
> >> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
> >> > >
> >> > > any objections/suggestions conceptually?
> >> > >
> >> > > Fokko, I see you have been actively contributing to spark hooks and
> >> > > operators so I could use your opinion before I implement this.
> >> > >
> >> > > Boris
> >> > >
> >> >
> >>
> >
> >
>

Re: Return results optionally from spark_sql_hook

Posted by Boris <bo...@gmail.com>.
Thanks Fokko, I think it will do it but my concern that in this case my dag
will initiate two separate spark sessions and it takes about 20 seconds in
our yarn environment to create it. I need to run 600 dags like that every
morning.

I am thinking now to create a pyspark job that will do insert and count and
write it to a temp file. Still not ideal... I wish I could just parse spark
SQL instead..

On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <fo...@driesprong.frl> wrote:

> Hi Boris,
>
> That sounds like a nice DAG.
>
> This is how I would do it: First run the long running query in a spark-sql
> operator like you have now. Create a python function that builds a
> SparkSession within Python (using the Spark pyspark api) and fetches the
> count from the spark partition that you've just created. Create a
> BranchPythonOperator that will invoke this function, and based on, if the
> count is ok or not, branch:
>
>    - If the count is okay, branch downstream and continue with the normal
>    execution.
>    - If the count is off, terminate and send you and email/slack that the
>    count is not as expected.
>
> ​This will look something like this:
> [image: Inline afbeelding 1]​
>
> Would this solve your problem?
>
> Cheers, Fokko
>
>
>
> 2017-10-14 13:42 GMT+02:00 Boris Tyukin <bo...@boristyukin.com>:
>
>> Hi Fokko, thanks for your response, really appreciate it!
>>
>> Basically in my case I have two Spark SQL queries:
>>
>> 1) the first query does INSERT OVERWRITE to a partition and may take a
>> while for a while
>> 2) then I run a second query right after it to get count of rows of that
>> partition.
>> 3) I need to pass that count back to airflow dag and this count will be
>> used by the next task in the DAG to make a decision if this partition
>> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a
>> production table partition.
>>
>> So I need somehow to get that count of rows. My initial though was to
>> parse
>> the log and extract that count but looks like even if i do regex it does
>> not quite work - spark sql writes query output to stdout which airflow
>> spark sql hook does not capture right now.
>>
>> if you can suggest a better solution for me it would be great!
>>
>> Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE
>> PARTITION in the same pyspark job but I found out that spark does not
>> support this statement yet and I have to use Hive.
>>
>> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko <fo...@driesprong.frl>
>> wrote:
>>
>> > Hi Boris,
>> >
>> > Thank you for your question and excuse me for the late response,
>> currently
>> > I'm on holiday.
>> >
>> > The solution that you suggest, would not be my preferred choice.
>> Extracting
>> > results from a log using a regex is expensive in terms of computational
>> > costs, and error prone. My question is, what are you trying to
>> accomplish?
>> > For me there are two ways of using the Spark-sql operator:
>> >
>> >    1. ETL Using Spark: Instead of returning the results, write the
>> results
>> >    back to a new table, or a new partition within the table. This data
>> can
>> > be
>> >    used downstream in the dag. Also, this will write the data to hdfs
>> > which is
>> >    nice for persistance.
>> >    2. Write the data in a simple and widely supported format (such as
>> csv)
>> >    onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get`
>> to
>> > you
>> >    local file-system. Or use `hdfs dfs -cat ... | application.py` to
>> pipe
>> > it
>> >    to your application directly.
>> >
>> > What you are trying to accomplish, looks for me something that would fit
>> > the spark-submit job, where you can submit pyspark applications where
>> you
>> > can directly fetch the results from Spark:
>> >
>> > Welcome to
>> >       ____              __
>> >      / __/__  ___ _____/ /__
>> >     _\ \/ _ \/ _ `/ __/  '_/
>> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
>> >       /_/
>> >
>> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
>> > SparkSession available as 'spark'.
>> > >>> spark.sql("SELECT 1 as count").first()
>> > Row(count=1)
>> >
>> > Most of the time we use the Spark-sql to transform the data, then use
>> sqoop
>> > to get the data from hdfs to a rdbms to expose the data to the business.
>> > These examples are for Spark using hdfs, but for s3 it is somewhat the
>> > same.
>> >
>> > Does this answer your question, if not, could you elaborate the problem
>> > that you are facing?
>> >
>> > Ciao, Fokko
>> >
>> >
>> >
>> >
>> > 2017-10-13 15:54 GMT+02:00 Boris <bo...@gmail.com>:
>> >
>> > > hi guys,
>> > >
>> > > I opened JIRA on this and will be working on PR
>> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
>> > >
>> > > any objections/suggestions conceptually?
>> > >
>> > > Fokko, I see you have been actively contributing to spark hooks and
>> > > operators so I could use your opinion before I implement this.
>> > >
>> > > Boris
>> > >
>> >
>>
>
>

Re: Return results optionally from spark_sql_hook

Posted by "Driesprong, Fokko" <fo...@driesprong.frl>.
Hi Boris,

That sounds like a nice DAG.

This is how I would do it: First run the long running query in a spark-sql
operator like you have now. Create a python function that builds a
SparkSession within Python (using the Spark pyspark api) and fetches the
count from the spark partition that you've just created. Create a
BranchPythonOperator that will invoke this function, and based on, if the
count is ok or not, branch:

   - If the count is okay, branch downstream and continue with the normal
   execution.
   - If the count is off, terminate and send you and email/slack that the
   count is not as expected.

​This will look something like this:
[image: Inline afbeelding 1]​

Would this solve your problem?

Cheers, Fokko



2017-10-14 13:42 GMT+02:00 Boris Tyukin <bo...@boristyukin.com>:

> Hi Fokko, thanks for your response, really appreciate it!
>
> Basically in my case I have two Spark SQL queries:
>
> 1) the first query does INSERT OVERWRITE to a partition and may take a
> while for a while
> 2) then I run a second query right after it to get count of rows of that
> partition.
> 3) I need to pass that count back to airflow dag and this count will be
> used by the next task in the DAG to make a decision if this partition
> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a
> production table partition.
>
> So I need somehow to get that count of rows. My initial though was to parse
> the log and extract that count but looks like even if i do regex it does
> not quite work - spark sql writes query output to stdout which airflow
> spark sql hook does not capture right now.
>
> if you can suggest a better solution for me it would be great!
>
> Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE
> PARTITION in the same pyspark job but I found out that spark does not
> support this statement yet and I have to use Hive.
>
> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko <fo...@driesprong.frl>
> wrote:
>
> > Hi Boris,
> >
> > Thank you for your question and excuse me for the late response,
> currently
> > I'm on holiday.
> >
> > The solution that you suggest, would not be my preferred choice.
> Extracting
> > results from a log using a regex is expensive in terms of computational
> > costs, and error prone. My question is, what are you trying to
> accomplish?
> > For me there are two ways of using the Spark-sql operator:
> >
> >    1. ETL Using Spark: Instead of returning the results, write the
> results
> >    back to a new table, or a new partition within the table. This data
> can
> > be
> >    used downstream in the dag. Also, this will write the data to hdfs
> > which is
> >    nice for persistance.
> >    2. Write the data in a simple and widely supported format (such as
> csv)
> >    onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get` to
> > you
> >    local file-system. Or use `hdfs dfs -cat ... | application.py` to pipe
> > it
> >    to your application directly.
> >
> > What you are trying to accomplish, looks for me something that would fit
> > the spark-submit job, where you can submit pyspark applications where you
> > can directly fetch the results from Spark:
> >
> > Welcome to
> >       ____              __
> >      / __/__  ___ _____/ /__
> >     _\ \/ _ \/ _ `/ __/  '_/
> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
> >       /_/
> >
> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> > SparkSession available as 'spark'.
> > >>> spark.sql("SELECT 1 as count").first()
> > Row(count=1)
> >
> > Most of the time we use the Spark-sql to transform the data, then use
> sqoop
> > to get the data from hdfs to a rdbms to expose the data to the business.
> > These examples are for Spark using hdfs, but for s3 it is somewhat the
> > same.
> >
> > Does this answer your question, if not, could you elaborate the problem
> > that you are facing?
> >
> > Ciao, Fokko
> >
> >
> >
> >
> > 2017-10-13 15:54 GMT+02:00 Boris <bo...@gmail.com>:
> >
> > > hi guys,
> > >
> > > I opened JIRA on this and will be working on PR
> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
> > >
> > > any objections/suggestions conceptually?
> > >
> > > Fokko, I see you have been actively contributing to spark hooks and
> > > operators so I could use your opinion before I implement this.
> > >
> > > Boris
> > >
> >
>

Re: Return results optionally from spark_sql_hook

Posted by Boris Tyukin <bo...@boristyukin.com>.
Hi Fokko, thanks for your response, really appreciate it!

Basically in my case I have two Spark SQL queries:

1) the first query does INSERT OVERWRITE to a partition and may take a
while for a while
2) then I run a second query right after it to get count of rows of that
partition.
3) I need to pass that count back to airflow dag and this count will be
used by the next task in the DAG to make a decision if this partition
should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a
production table partition.

So I need somehow to get that count of rows. My initial though was to parse
the log and extract that count but looks like even if i do regex it does
not quite work - spark sql writes query output to stdout which airflow
spark sql hook does not capture right now.

if you can suggest a better solution for me it would be great!

Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE
PARTITION in the same pyspark job but I found out that spark does not
support this statement yet and I have to use Hive.

On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko <fo...@driesprong.frl>
wrote:

> Hi Boris,
>
> Thank you for your question and excuse me for the late response, currently
> I'm on holiday.
>
> The solution that you suggest, would not be my preferred choice. Extracting
> results from a log using a regex is expensive in terms of computational
> costs, and error prone. My question is, what are you trying to accomplish?
> For me there are two ways of using the Spark-sql operator:
>
>    1. ETL Using Spark: Instead of returning the results, write the results
>    back to a new table, or a new partition within the table. This data can
> be
>    used downstream in the dag. Also, this will write the data to hdfs
> which is
>    nice for persistance.
>    2. Write the data in a simple and widely supported format (such as csv)
>    onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get` to
> you
>    local file-system. Or use `hdfs dfs -cat ... | application.py` to pipe
> it
>    to your application directly.
>
> What you are trying to accomplish, looks for me something that would fit
> the spark-submit job, where you can submit pyspark applications where you
> can directly fetch the results from Spark:
>
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
>       /_/
>
> Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> SparkSession available as 'spark'.
> >>> spark.sql("SELECT 1 as count").first()
> Row(count=1)
>
> Most of the time we use the Spark-sql to transform the data, then use sqoop
> to get the data from hdfs to a rdbms to expose the data to the business.
> These examples are for Spark using hdfs, but for s3 it is somewhat the
> same.
>
> Does this answer your question, if not, could you elaborate the problem
> that you are facing?
>
> Ciao, Fokko
>
>
>
>
> 2017-10-13 15:54 GMT+02:00 Boris <bo...@gmail.com>:
>
> > hi guys,
> >
> > I opened JIRA on this and will be working on PR
> > https://issues.apache.org/jira/browse/AIRFLOW-1713
> >
> > any objections/suggestions conceptually?
> >
> > Fokko, I see you have been actively contributing to spark hooks and
> > operators so I could use your opinion before I implement this.
> >
> > Boris
> >
>

Re: Return results optionally from spark_sql_hook

Posted by "Driesprong, Fokko" <fo...@driesprong.frl>.
Hi Boris,

Thank you for your question and excuse me for the late response, currently
I'm on holiday.

The solution that you suggest, would not be my preferred choice. Extracting
results from a log using a regex is expensive in terms of computational
costs, and error prone. My question is, what are you trying to accomplish?
For me there are two ways of using the Spark-sql operator:

   1. ETL Using Spark: Instead of returning the results, write the results
   back to a new table, or a new partition within the table. This data can be
   used downstream in the dag. Also, this will write the data to hdfs which is
   nice for persistance.
   2. Write the data in a simple and widely supported format (such as csv)
   onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get` to you
   local file-system. Or use `hdfs dfs -cat ... | application.py` to pipe it
   to your application directly.

What you are trying to accomplish, looks for me something that would fit
the spark-submit job, where you can submit pyspark applications where you
can directly fetch the results from Spark:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
SparkSession available as 'spark'.
>>> spark.sql("SELECT 1 as count").first()
Row(count=1)

Most of the time we use the Spark-sql to transform the data, then use sqoop
to get the data from hdfs to a rdbms to expose the data to the business.
These examples are for Spark using hdfs, but for s3 it is somewhat the same.

Does this answer your question, if not, could you elaborate the problem
that you are facing?

Ciao, Fokko




2017-10-13 15:54 GMT+02:00 Boris <bo...@gmail.com>:

> hi guys,
>
> I opened JIRA on this and will be working on PR
> https://issues.apache.org/jira/browse/AIRFLOW-1713
>
> any objections/suggestions conceptually?
>
> Fokko, I see you have been actively contributing to spark hooks and
> operators so I could use your opinion before I implement this.
>
> Boris
>