You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Andrew Phillips <an...@apache.org> on 2016/08/03 21:29:41 UTC
Suggested way of passing "input parameters" to a DAG run?
Hi all
What is/are the suggested way(s) of passing "input parameters" to a DAG
run (adding quotes since, as far as we can tell, that concept doesn't
exist natively in Airflow, probably by design)?
This would be information that is used by one or multiple operators in a
DAG run and that should not change for all task instances in that DAG
run, but may be different for another DAG run executing concurrently. An
example would be a Git pull request number.
What we tried first was to use a Variable for this, but it doesn't look
like that will work because the value can change during the execution of
the DAG run. At least, that seems to be the case in the way we're using
it:
input_params = Variable.get(<variable_for_dag>)
dag = DAG(..., params=input_params)
We had hoped that this would "fix" the values of the parameters when the
DAG run was created, but that does not seem to be the case: if the
variable is updated (in preparation for a new DAG run) while a DAG run
is active, tasks that haven't executed yet see the new value. I.e. we
end up seeing this:
set Variable my_param to "foo"
dag_run_1 starts, gets the variable and passes my_param to the Dag
object
dag_run_1.op_1 evaluates {{ params.my_param }} and gets "foo"
set Variable my_param to "bar"
dag_run_2 starts and passes var to the Dag object
dag_run_1.op_2 evaluates {{ params.my_param }} and sees "bar" # want
this to still be foo!
Not sure at this point whether this is a bug or, if not, whether there's
a different way to retrieve the value of a variable that allows us to
"fix" it for the duration of the DAG run.
Or, taking a step back, is there some other approach that we could use
to store and retrieve input data to DAGs?
Regards
ap
Re: Suggested way of passing "input parameters" to a DAG run?
Posted by Andrew Phillips <an...@apache.org>.
On 2016-08-03 18:42, Joseph Napolitano wrote:
> What I can say is that we use it a lot, but very lightly. We basically
> use
> it to communicate the S3 key for a flat file between operators.
Just to follow up on this: here's a Gist with some example usage based
on the "store in XCom" approach:
https://gist.github.com/anonymous/b41228011275fb362aa5b95b476de542
We're using a macro to make retrieval a little nicer, and the end result
is not too different from reading from {{ params }}.
Nevertheless, +1 for a more first-class approach, if possible. {{
dag_run.params.foo }}, anyone? ;-)
On a related note, out of curiosity: does someone have any information
on use case(s) where changes to dag.params are useful during a DAG run?
Or is that just a consequence of the fact that the DAG object is
instantiated multiple times during a single run?
Is there an implicit assumption that the values passed to the DAG object
are the same every time within a single run? If so, that might be a good
topic for an FAQ entry/blog post?
Regards
ap
Re: Suggested way of passing "input parameters" to a DAG run?
Posted by Joseph Napolitano <jo...@blueapron.com.INVALID>.
What I can say is that we use it a lot, but very lightly. We basically use
it to communicate the S3 key for a flat file between operators.
Definitely don't use it to send actual data :)
Cheers
On Wed, Aug 3, 2016 at 6:21 PM, Andrew Phillips <an...@apache.org> wrote:
> Let me know if that helps, or if I completely misunderstood :)
>>
>
> That helps, indeed - thanks, Joe! We were in fact going down exactly this
> path as an alternative; we were just a bit hesitant to use XComs based on
> the following comment in the docs [1]:
>
> "If it absolutely can’t be avoided, Airflow does have a feature for
> operator cross-communication called XCom that is described elsewhere in
> this document."
>
> The statements talks about sharing information *between* tasks, but we
> weren't sure if this should be read as "stay away from XComs unless there's
> no other option". Curious to hear the community's thoughts on that.
>
> Thanks for the quick response!
>
> ap
>
> [1] https://pythonhosted.org/airflow/concepts.html#operators
>
--
*Joe Napolitano *| Sr. Data Engineer
www.blueapron.com | 5 Crosby Street, New York, NY 10013
Re: Suggested way of passing "input parameters" to a DAG run?
Posted by Andrew Phillips <an...@apache.org>.
> Let me know if that helps, or if I completely misunderstood :)
That helps, indeed - thanks, Joe! We were in fact going down exactly
this path as an alternative; we were just a bit hesitant to use XComs
based on the following comment in the docs [1]:
"If it absolutely can\u2019t be avoided, Airflow does have a feature for
operator cross-communication called XCom that is described elsewhere in
this document."
The statements talks about sharing information *between* tasks, but we
weren't sure if this should be read as "stay away from XComs unless
there's no other option". Curious to hear the community's thoughts on
that.
Thanks for the quick response!
ap
[1] https://pythonhosted.org/airflow/concepts.html#operators
Re: Suggested way of passing "input parameters" to a DAG run?
Posted by Joseph Napolitano <jo...@blueapron.com.INVALID>.
There are a lot of ways to define the input source. Let's suppose you have
these inputs in a relational database, or a flat file on S3.
The first task in your DAG would be a matter of querying for those inputs,
or grabbing the file. The trick is getting the inputs to later tasks. The
XCOM feature is a way to share data between your tasks, so it's a matter of
pulling the XCOM from the task that originally queried the inputs.
Suppose you had an "input operator"
class InputOperator(BaseOperator)
.... with an execute method ...
def execute(self, context):
... whatever you return it retrievable in later tasks through XCOM
....
return {"input_key": "input_value"}
Then in your DAG
input_operator_task = ... your InputOperator ....
downstream_task = SomeExistingOperator(
task_id='downstream_task',
keyword_arg_using_your_inputs="{{ti.xcom_pull(task_ids='input_operator_task')}}",
dag=dag
)
The XCOM pull is evaluated through the Jinja template.
Let me know if that helps, or if I completely misunderstood :)
Joe Nap
On Wed, Aug 3, 2016 at 5:29 PM, Andrew Phillips <an...@apache.org> wrote:
> Hi all
>
> What is/are the suggested way(s) of passing "input parameters" to a DAG
> run (adding quotes since, as far as we can tell, that concept doesn't exist
> natively in Airflow, probably by design)?
>
> This would be information that is used by one or multiple operators in a
> DAG run and that should not change for all task instances in that DAG run,
> but may be different for another DAG run executing concurrently. An example
> would be a Git pull request number.
>
> What we tried first was to use a Variable for this, but it doesn't look
> like that will work because the value can change during the execution of
> the DAG run. At least, that seems to be the case in the way we're using it:
>
> input_params = Variable.get(<variable_for_dag>)
> dag = DAG(..., params=input_params)
>
> We had hoped that this would "fix" the values of the parameters when the
> DAG run was created, but that does not seem to be the case: if the variable
> is updated (in preparation for a new DAG run) while a DAG run is active,
> tasks that haven't executed yet see the new value. I.e. we end up seeing
> this:
>
> set Variable my_param to "foo"
> dag_run_1 starts, gets the variable and passes my_param to the Dag object
> dag_run_1.op_1 evaluates {{ params.my_param }} and gets "foo"
> set Variable my_param to "bar"
> dag_run_2 starts and passes var to the Dag object
> dag_run_1.op_2 evaluates {{ params.my_param }} and sees "bar" # want this
> to still be foo!
>
> Not sure at this point whether this is a bug or, if not, whether there's a
> different way to retrieve the value of a variable that allows us to "fix"
> it for the duration of the DAG run.
>
> Or, taking a step back, is there some other approach that we could use to
> store and retrieve input data to DAGs?
>
> Regards
>
> ap
>
>
>
--
*Joe Napolitano *| Sr. Data Engineer
www.blueapron.com | 5 Crosby Street, New York, NY 10013