You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/06 20:59:28 UTC

[GitHub] [airflow] austynh opened a new issue #8168: Database table `dag_run` record is inserted with incorrect data for nested SubDags.

austynh opened a new issue #8168: Database table `dag_run` record is inserted with incorrect data for nested SubDags.
URL: https://github.com/apache/airflow/issues/8168
 
 
   **Apache Airflow version**: `>=1.10.3`
   
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: Google Cloud Project (GCP) - Google Kubernetes Engine (GKE)
   - **OS** (e.g. from /etc/os-release): Debian GNU/Linux 9 (stretch)
   - **Kernel** (e.g. `uname -a`): Linux cs-6000-devshell-vm-3b3f0888-f5b2-417d-a388-6519df38bad1 4.19.112+ #1 SMP Thu Mar 26 23:42:41 PDT 2020 x86_64 GNU/Linux
   - **Install tools**: Conda & Pip
   - **Others**: None
   - **Database**: PostgreSQL
   
   **What happened**:
   
   Upgrading Airflow to versions `>=1.10.3` resulted in SubDag issues rendering `dag_run.conf` values. The `dag_run.conf` values were getting rendered as Null or empty for a (*the first nested SubDag) SubDags (`dag_run.conf` values are passed into the DAG using the Airflow CLI). 
   
   Further investigation revealed that the database table `dag_run` record for the _first_ of the SubDags received incorrect values.
   The first SubDag database `dag_run` record gets set with the following incorrect fields: 
   
   * external_trigger -> 'f' (False). Should be 't' (True) since the DAG is manually triggered.
   * run_id -> `backfill_{ TIMESTAMP }` (ex. `backfil_2020-03-31T22:33:20+00:00`). Should be the `run_id` specified by the `Airflow CLI` run command.
   * conf -> Null. Should contain the `conf` passed in at runtime.
   
   Investigation into reproducibility showed that the issue appears to be directly related to Nested SubDags. Multiple Nested SubDags seems to cause the invalid `dag_run` to be inserted for the first Nested SubDag in a DAG. The following conditions represent all the factors in producing the issue (More details in “How to reproduce it” section):
   
   * Nested SubDags is a key contributing factor in causing the invalid `dag_run`.
   * Multiple nested SubDags.
   * Airflow version `>=1.10.3`.
   
   **What you expected to happen**:
   
   The expected behavior would yield the following outcomes:
   
   * All SubDags have a valid `dag_run` record as described above in the “What Happened” section.
   * All SubDags appropriately render `dag_run` record values.
   
   **How to reproduce it**:
   The Below example DAG has the following structure:
   
       DAG:
           - subdag (1)
               - subdag (2)
                   - bash op (3)
           - subdag (5)
               - subdag (6)
                   - bash op (7)
       Where the DAG assembly is as follows (subdag 1 is upstream to subdag 5):
           subdag (1) >> subdag(5)
   
   The `dag_run` database table record will incorrectly get set for subdag (2)
   
   **Code for reproducing**:
   ```
   import datetime
   from airflow import DAG
   
   from airflow.operators.subdag_operator import SubDagOperator
   from airflow.operators.bash_operator import BashOperator
   
   
   def test_subdag(parent_dag, job_name):
       subdag_id = job_name
       subdag_name = (
           "{parent_id}.{subdag_id}"
           .format(parent_id=parent_dag.dag_id, subdag_id=subdag_id)
       )
   
       with DAG(subdag_name, schedule_interval=parent_dag.schedule_interval,
                default_args=parent_dag.default_args,
                user_defined_macros=parent_dag.user_defined_macros) as subdag:
   
           bash_op_subdag = test_subdag_with_bash_op(subdag, job_name + '_bash_op_subdag')
   
           return SubDagOperator(subdag=subdag,
                                 task_id=subdag_id,
                                 dag=parent_dag)
   
   
   def test_subdag_with_bash_op(parent_dag, job_name):
       subdag_id = job_name
       subdag_name = (
           "{parent_id}.{subdag_id}"
           .format(parent_id=parent_dag.dag_id, subdag_id=subdag_id)
       )
   
       with DAG(subdag_name, schedule_interval=parent_dag.schedule_interval,
                default_args=parent_dag.default_args,
                user_defined_macros=parent_dag.user_defined_macros) as subdag:
   
           bash_op = BashOperator(
               task_id='bash_cmd',
               bash_command='echo {{ dag_run.conf.test_value }}',
               dag=subdag,
           )
   
           return SubDagOperator(subdag=subdag,
                                 task_id=subdag_id,
                                 dag=parent_dag)
   
   
   DEFAULT_ARGS = {
       'owner': 'bombora',
       'depends_on_past': False,
       'start_date': datetime.datetime(2018, 5, 28),
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 0
   }
   
   
   with DAG('test_dropped_conf_failure_dag',
            catchup=False,
            default_args=DEFAULT_ARGS,
            schedule_interval=None) as dag:
   
       first_subdag = test_subdag(dag, 'subdag_1')
       second_subdag = test_subdag(dag, 'subdag_2')
   
       first_subdag >> second_subdag
   ```
   
   **Why Nested SubDags Seem to be the Cause**: 
   
   The following will resolve the invalid insertion of the `dag_run` for subdag (2):
   
   * Removal of subdag (5) fixes the issue.
   * (OR) Changing the DAG structure to the following:
   
   		DAG:
   			- subdag (1)
   				- subdag (2)
   					- bash op (3)
   			- bash_op (7)
   		Where the DAG assembly is as follows (subdag 1 is upstream to subdag 5):
   			subdag (1) >> bash_op (7)
   
   * (OR) Changing the DAG structure to the following:
   
   		DAG:
   			- subdag (1)
   				- subdag (2)
   					- bash op (3)
   			- subdag (6)
   				- bash_op (7)
   		Where the DAG assembly is as follows (subdag 1 is upstream to subdag 5):
   			subdag (1) >> subdag (6)
   
   
   **Anything else we need to know**:
   
   The issue occurs consistently across DAG runs.
   
   The issue only appears in the first nested SubDag.
   
   Issue does not occur on Airflow Version `1.10.2`
   
   Could be related to Issue: https://github.com/apache/airflow/issues/7966 [#7966]
   This may be a more specific issue.
   
   
   
   Many Thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] boring-cyborg[bot] commented on issue #8168: Database table `dag_run` record is inserted with incorrect data for nested SubDags.

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #8168: Database table `dag_run` record is inserted with incorrect data for nested SubDags.
URL: https://github.com/apache/airflow/issues/8168#issuecomment-610033731
 
 
   Thanks for opening your first issue here! Be sure to follow the issue template!
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services