You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Andrew Phillips <an...@apache.org> on 2016/08/03 21:29:41 UTC

Suggested way of passing "input parameters" to a DAG run?

Hi all

What is/are the suggested way(s) of passing "input parameters" to a DAG 
run (adding quotes since, as far as we can tell, that concept doesn't 
exist natively in Airflow, probably by design)?

This would be information that is used by one or multiple operators in a 
DAG run and that should not change for all task instances in that DAG 
run, but may be different for another DAG run executing concurrently. An 
example would be a Git pull request number.

What we tried first was to use a Variable for this, but it doesn't look 
like that will work because the value can change during the execution of 
the DAG run. At least, that seems to be the case in the way we're using 
it:

input_params = Variable.get(<variable_for_dag>)
dag = DAG(..., params=input_params)

We had hoped that this would "fix" the values of the parameters when the 
DAG run was created, but that does not seem to be the case: if the 
variable is updated (in preparation for a new DAG run) while a DAG run 
is active, tasks that haven't executed yet see the new value. I.e. we 
end up seeing this:

set Variable my_param to "foo"
dag_run_1 starts, gets the variable and passes my_param to the Dag 
object
dag_run_1.op_1 evaluates {{ params.my_param }} and gets "foo"
set Variable my_param to "bar"
dag_run_2 starts and passes var to the Dag object
dag_run_1.op_2 evaluates {{ params.my_param }} and sees "bar" # want 
this to still be foo!

Not sure at this point whether this is a bug or, if not, whether there's 
a different way to retrieve the value of a variable that allows us to 
"fix" it for the duration of the DAG run.

Or, taking a step back, is there some other approach that we could use 
to store and retrieve input data to DAGs?

Regards

ap



Re: Suggested way of passing "input parameters" to a DAG run?

Posted by Andrew Phillips <an...@apache.org>.

On 2016-08-03 18:42, Joseph Napolitano wrote:
> What I can say is that we use it a lot, but very lightly.  We basically 
> use
> it to communicate the S3 key for a flat file between operators.

Just to follow up on this: here's a Gist with some example usage based 
on the "store in XCom" approach:

https://gist.github.com/anonymous/b41228011275fb362aa5b95b476de542

We're using a macro to make retrieval a little nicer, and the end result 
is not too different from reading from {{ params }}.

Nevertheless, +1 for a more first-class approach, if possible. {{ 
dag_run.params.foo }}, anyone? ;-)

On a related note, out of curiosity: does someone have any information 
on use case(s) where changes to dag.params are useful during a DAG run? 
Or is that just a consequence of the fact that the DAG object is 
instantiated multiple times during a single run?

Is there an implicit assumption that the values passed to the DAG object 
are the same every time within a single run? If so, that might be a good 
topic for an FAQ entry/blog post?

Regards

ap

Re: Suggested way of passing "input parameters" to a DAG run?

Posted by Joseph Napolitano <jo...@blueapron.com.INVALID>.
What I can say is that we use it a lot, but very lightly.  We basically use
it to communicate the S3 key for a flat file between operators.

Definitely don't use it to send actual data :)

Cheers

On Wed, Aug 3, 2016 at 6:21 PM, Andrew Phillips <an...@apache.org> wrote:

> Let me know if that helps, or if I completely misunderstood :)
>>
>
> That helps, indeed - thanks, Joe! We were in fact going down exactly this
> path as an alternative; we were just a bit hesitant to use XComs based on
> the following comment in the docs [1]:
>
> "If it absolutely can’t be avoided, Airflow does have a feature for
> operator cross-communication called XCom that is described elsewhere in
> this document."
>
> The statements talks about sharing information *between* tasks, but we
> weren't sure if this should be read as "stay away from XComs unless there's
> no other option". Curious to hear the community's thoughts on that.
>
> Thanks for the quick response!
>
> ap
>
> [1] https://pythonhosted.org/airflow/concepts.html#operators
>



-- 
*Joe Napolitano *| Sr. Data Engineer
www.blueapron.com | 5 Crosby Street, New York, NY 10013

Re: Suggested way of passing "input parameters" to a DAG run?

Posted by Andrew Phillips <an...@apache.org>.
> Let me know if that helps, or if I completely misunderstood :)

That helps, indeed - thanks, Joe! We were in fact going down exactly 
this path as an alternative; we were just a bit hesitant to use XComs 
based on the following comment in the docs [1]:

"If it absolutely can\u2019t be avoided, Airflow does have a feature for 
operator cross-communication called XCom that is described elsewhere in 
this document."

The statements talks about sharing information *between* tasks, but we 
weren't sure if this should be read as "stay away from XComs unless 
there's no other option". Curious to hear the community's thoughts on 
that.

Thanks for the quick response!

ap

[1] https://pythonhosted.org/airflow/concepts.html#operators

Re: Suggested way of passing "input parameters" to a DAG run?

Posted by Joseph Napolitano <jo...@blueapron.com.INVALID>.
There are a lot of ways to define the input source.  Let's suppose you have
these inputs in a relational database, or a flat file on S3.

The first task in your DAG would be a matter of querying for those inputs,
or grabbing the file.  The trick is getting the inputs to later tasks.  The
XCOM feature is a way to share data between your tasks, so it's a matter of
pulling the XCOM from the task that originally queried the inputs.

Suppose you had an "input operator"

class InputOperator(BaseOperator)
    .... with an execute method ...
    def execute(self, context):
        ... whatever you return it retrievable in later tasks through XCOM
....
        return {"input_key": "input_value"}

Then in your DAG

input_operator_task = ... your InputOperator ....

downstream_task = SomeExistingOperator(
    task_id='downstream_task',

 keyword_arg_using_your_inputs="{{ti.xcom_pull(task_ids='input_operator_task')}}",
    dag=dag
)

The XCOM pull is evaluated through the Jinja template.

Let me know if that helps, or if I completely misunderstood :)

Joe Nap

On Wed, Aug 3, 2016 at 5:29 PM, Andrew Phillips <an...@apache.org> wrote:

> Hi all
>
> What is/are the suggested way(s) of passing "input parameters" to a DAG
> run (adding quotes since, as far as we can tell, that concept doesn't exist
> natively in Airflow, probably by design)?
>
> This would be information that is used by one or multiple operators in a
> DAG run and that should not change for all task instances in that DAG run,
> but may be different for another DAG run executing concurrently. An example
> would be a Git pull request number.
>
> What we tried first was to use a Variable for this, but it doesn't look
> like that will work because the value can change during the execution of
> the DAG run. At least, that seems to be the case in the way we're using it:
>
> input_params = Variable.get(<variable_for_dag>)
> dag = DAG(..., params=input_params)
>
> We had hoped that this would "fix" the values of the parameters when the
> DAG run was created, but that does not seem to be the case: if the variable
> is updated (in preparation for a new DAG run) while a DAG run is active,
> tasks that haven't executed yet see the new value. I.e. we end up seeing
> this:
>
> set Variable my_param to "foo"
> dag_run_1 starts, gets the variable and passes my_param to the Dag object
> dag_run_1.op_1 evaluates {{ params.my_param }} and gets "foo"
> set Variable my_param to "bar"
> dag_run_2 starts and passes var to the Dag object
> dag_run_1.op_2 evaluates {{ params.my_param }} and sees "bar" # want this
> to still be foo!
>
> Not sure at this point whether this is a bug or, if not, whether there's a
> different way to retrieve the value of a variable that allows us to "fix"
> it for the duration of the DAG run.
>
> Or, taking a step back, is there some other approach that we could use to
> store and retrieve input data to DAGs?
>
> Regards
>
> ap
>
>
>


-- 
*Joe Napolitano *| Sr. Data Engineer
www.blueapron.com | 5 Crosby Street, New York, NY 10013