You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Tejas (JIRA)" <ji...@apache.org> on 2018/01/18 09:15:00 UTC

[jira] [Created] (AIRFLOW-2012) Jinja2 unable to access user_defined_macros when run with catchup

Tejas created AIRFLOW-2012:
------------------------------

             Summary: Jinja2 unable to access user_defined_macros when run with catchup
                 Key: AIRFLOW-2012
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2012
             Project: Apache Airflow
          Issue Type: Bug
          Components: backfill
    Affects Versions: 1.9.0
         Environment: Docker: Modified version of https://github.com/puckel/docker-airflow
            Reporter: Tejas


The example.py with DAG definition is as follows

{code:python}
SCHEDULE_INTERVAL = '@daily'
launch_date = dt(2018, 1, 10)

def join_array(x):
  if type(x) == list:
    return ",".join(x)
  elif type(x) == str:
    return x

dag = DAG(DAGNAME, description = 'Backfill',
  schedule_interval = SCHEDULE_INTERVAL,
  start_date =launch_date,
  end_date = dt.today(),
  catchup = True,
  user_defined_macros = {'join_array': join_array},
  max_active_runs = 1)

run_scope_task = \
    PostgresInsertOperator(task_id='run_scope',
       dag=dag,
       target_table=scope_run,
       insert_object='sql/new_property/scope_run.sql',
       params={'x': ['7813']},
       analyze=True)
{code}

Steps to reproduce:
1. The task executes a SQL with `{{ join_array(params.x) }}` in the SQL template. For trial purposes, `SCHEDULE_INTERVAL = None` is set. The template picks up the user defined macro and executes successfully. 

2. With the same template, turn on schedule to `SCHEDULE_INTERVAL = '@daily'`. The task fails with follows error in Jinja.

{code}
jinja2.exceptions.UndefinedError: 'airflow.models.DAG object' has no attribute 'join_array'
[2018-01-18 06:38:06,182] {models.py:1624} INFO - Marking task as FAILED.
[2018-01-18 06:38:06,202] {models.py:1644} ERROR - 'airflow.models.DAG object' has no attribute 'join_array'
[2018-01-18 06:38:06,206] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-01-18 06:38:06,206] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-01-18 06:38:06,207] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-01-18 06:38:06,207] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.6/dist-packages/airflow/bin/cli.py", line 392, in run
[2018-01-18 06:38:06,207] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-01-18 06:38:06,207] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 50, in wrapper
[2018-01-18 06:38:06,207] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-01-18 06:38:06,207] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.6/dist-packages/airflow/models.py", line 1478, in _run_raw_task
[2018-01-18 06:38:06,207] {base_task_runner.py:98} INFO - Subtask:     self.render_templates()
[2018-01-18 06:38:06,208] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.6/dist-packages/airflow/models.py", line 1761, in render_templates
[2018-01-18 06:38:06,208] {base_task_runner.py:98} INFO - Subtask:     rendered_content = rt(attr, content, jinja_context)
[2018-01-18 06:38:06,208] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.6/dist-packages/airflow/models.py", line 2484, in render_template
[2018-01-18 06:38:06,208] {base_task_runner.py:98} INFO - Subtask:     return self.render_template_from_field(attr, content, context, jinja_env)
[2018-01-18 06:38:06,208] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.6/dist-packages/airflow/models.py", line 2454, in render_template_from_field
[2018-01-18 06:38:06,208] {base_task_runner.py:98} INFO - Subtask:     result = jinja_env.from_string(content).render(**context)
[2018-01-18 06:38:06,209] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.6/dist-packages/jinja2/environment.py", line 989, in render
[2018-01-18 06:38:06,209] {base_task_runner.py:98} INFO - Subtask:     return self.environment.handle_exception(exc_info, True)
[2018-01-18 06:38:06,209] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.6/dist-packages/jinja2/environment.py", line 754, in handle_exception
[2018-01-18 06:38:06,209] {base_task_runner.py:98} INFO - Subtask:     reraise(exc_type, exc_value, tb)
[2018-01-18 06:38:06,209] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.6/dist-packages/jinja2/_compat.py", line 37, in reraise
[2018-01-18 06:38:06,209] {base_task_runner.py:98} INFO - Subtask:     raise value.with_traceback(tb)
[2018-01-18 06:38:06,210] {base_task_runner.py:98} INFO - Subtask:   File "<template>", line 9, in top-level template code
[2018-01-18 06:38:06,210] {base_task_runner.py:98} INFO - Subtask: jinja2.exceptions.UndefinedError: 'airflow.models.DAG object' has no attribute 'join_array'
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)