You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Galak (JIRA)" <ji...@apache.org> on 2017/11/13 15:57:00 UTC

[jira] [Created] (AIRFLOW-1814) Add op_args and op_kwargs in PythonOperator templated fields

Galak created AIRFLOW-1814:
------------------------------

             Summary: Add op_args and op_kwargs in PythonOperator templated fields
                 Key: AIRFLOW-1814
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1814
             Project: Apache Airflow
          Issue Type: Wish
          Components: operators
    Affects Versions: Airflow 1.8, 1.8.0
            Reporter: Galak
            Priority: Minor


*I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could be templated.*

I have 2 different use cases where this change could help a lot:

+1/ Provide some job execution information as a python callable argument:+
let's explain it through a simple example:
{code}
simple_task = PythonOperator(
    task_id='simple_task',
    provide_context=True,
    python_callable=extract_data,
    op_args=[
	"my_db_connection_id"
	"select * from my_table"
	"/data/{dag.dag_id}/{ts}/my_export.csv"
    ],
    dag=dag
)
{code}
"extract_data" python function seems to be simple here, but it could be anything re-usable in multiple dags...


+2/ Provide some XCom value as a python callable argument:+
Let's say I a have a task which is retrieving or calculating a value, and then storing it in an XCom for further use by other tasks:
{code}
value_producer_task = PythonOperator(
    task_id='value_producer_task',
    provide_context=True,
    python_callable=produce_value,
    op_args=[
	"my_db_connection_id",
	"some_other_static_parameter",
	"my_xcom_key"
    ],
    dag=dag
)
{code}

Then I can just configure a PythonCallable task to use the produced value:
{code}
value_consumer_task = PythonOperator(
    task_id='value_consumer_task',
    provide_context=True,
    python_callable=consume_value,
    op_args=[
	"{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}"
    ],
    dag=dag
)
{code}


I quickly tried the following class:

{code}
from airflow.operators.python_operator import PythonOperator
class MyPythonOperator(PythonOperator):
    template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs')
{code}

and it worked like a charm.

So could these 2 arguments be added to templated_fields? Or did I miss some major drawback to this change?




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)