You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Daniel Imberman (Jira)" <ji...@apache.org> on 2020/03/27 23:41:00 UTC

[jira] [Closed] (AIRFLOW-392) DAG runs on strange schedule in the past when deployed

     [ https://issues.apache.org/jira/browse/AIRFLOW-392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Daniel Imberman closed AIRFLOW-392.
-----------------------------------
    Resolution: Auto Closed

> DAG runs on strange schedule in the past when deployed
> ------------------------------------------------------
>
>                 Key: AIRFLOW-392
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-392
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.7.1.3
>         Environment: AWS ElasticBeanstalk as a Docker application running in an Ubuntu-based container
>            Reporter: David Klosowski
>            Assignee: Norman Mu
>            Priority: Major
>
> Just deployed a new DAG ('weekly-no-track') that depends on 7 DAG task runs of another DAG ('daily-no-track').  When the DAG is deployed the scheduler schedules and runs multiple runs in the past (yesterday it ran for 6/12/2016 and 6/05/2016), despite the start date set to the deployment date.  
> It would be a bit difficult to include all the code being used in the DAG since we have multiple libraries we've built in Python that are being referenced here that we want to eventually open source.  I've included some of the code here.  Let me know if this is all clear and what I can do to help or if any insight can be provided as to what it is occurring and how we might fix this.
> {code}
> from __future__ import division, print_function
> from airflow.models import DAG
> from airflow.operators import DummyOperator, ExternalTaskSensor, TimeDeltaSensor
> from tn_etl_tools.aws.emr import EmrConfig, HiveConfig, read_cluster_templates
> from tn_etl_tools.aws.emr import EmrService, EmrServiceWrapper, HiveStepBuilder
> from tn_etl_tools.datesupport import ts_add
> from tn_etl_tools.hive import HivePartitions
> from tn_etl_tools.yaml import YamlLoader
> from datetime import datetime, timedelta
> from dateutil.relativedelta import relativedelta, SU, MO , TU, WE, TH, FR, SA
> from common_args import merge_dicts, CommonHiveParams
> from operator_builders import add_tasks, emr_hive_operator
> import os
> # === configs
> config_dir = os.getenv('DAG_CONFIG_DIR', '/usr/local/airflow/config')
> alert_email = os.getenv('AIRFLOW_TO_EMAIL')
> app_properties = YamlLoader.load_yaml(config_dir + '/app.yml')
> emr_cluster_properties = YamlLoader.load_yaml(config_dir + '/emr_clusters.yml')
> emr_config = EmrConfig.load(STAGE=app_properties['STAGE'], **app_properties['EMR'])
> hive_config = HiveConfig.load(STAGE=app_properties['STAGE'], **app_properties['HIVE'])
> emr_cluster_templates = read_cluster_templates(emr_cluster_properties)
> # === /configs
> # TODO: force execution_date = sunday?
> run_for_date = datetime(2016, 8, 8)
> emr_service = EmrService()
> emr_service_wrapper = EmrServiceWrapper(emr_service=emr_service,
>                                         emr_config=emr_config, cluster_templates=emr_cluster_templates)
> hive_step_builder = HiveStepBuilder(hive_config=hive_config)
> hive_params = CommonHiveParams(app_properties_hive=app_properties['HIVE'])
> args = {'owner': 'airflow',
>         'depends_on_past': False,
>         'start_date': run_for_date,
>         'email': [alert_email],
>         'email_on_failure': True,
>         'email_on_retry': False,
>         'retries': 1,
>         'trigger_rule' : 'all_success',
>         'emr_service_wrapper': emr_service_wrapper,
>         'hive_step_builder': hive_step_builder}
> user_defined_macros = {'hive_partitions': HivePartitions,
>                        'ts_add': ts_add}
> params = {'stage': app_properties['STAGE']}
> dag = DAG(dag_id='weekly_no_track', default_args=args, user_defined_macros=user_defined_macros, params=params,
>           schedule_interval=timedelta(days=7),
>           max_active_runs=1)
> # === task definitions
> task_definitions = {
>     'wait-for-dailies': {
>         'operator_type': 'dummy_operator', # hub for custom defined dependencies
>         'operator_args': {},
>         'depends_on': []
>     },
>     'weekly-no-track': {
>         'operator_type': 'emr_hive_operator',
>         'operator_args': {
>             'hive_step': {
>                 'script': 'weekly-no-track-airflow',  # temporary modified script with separate output path
>                 'cluster_name': 'geoprofile',
>                 'script_vars': merge_dicts(hive_params.default_params(), hive_params.rundate_params(), {
>                     'PARTITIONS': '{{hive_partitions.by_day(ts_add(ts, days=-6), ts_add(ts, days=1))}}',
>                 }),
>             }
>         },
>         'depends_on': ['wait-for-dailies']
>     }
> }
> # === /task definitions
> operator_builders = {'emr_hive_operator': emr_hive_operator,
>                      'time_delta_sensor': TimeDeltaSensor,
>                      'dummy_operator': DummyOperator}
> add_tasks(task_definitions, dag=dag, operator_builders=operator_builders)
> # === custom tasks
> downstream_task = dag.get_task('wait-for-dailies')
> for weekday in [MO, TU, WE, TH, FR, SA, SU]:
>     task_id = 'wait-for-daily-{day}'.format(day=weekday)
>     # weekday(-1) subtracts 1 relative week from the given weekday, however if the calculated date is already Monday,
>     # for example, -1 won't change the day.
>     delta = relativedelta(weekday=weekday(-1))
>     sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
>                                 external_dag_id='daily_no_track', external_task_id='daily-no-track',
>                                 execution_delta=delta, timeout=86400)  # 86400 = 24 hours
>     sensor.set_downstream(downstream_task)
> # === /custom tasks
> {code}
> Some referenced code
> {{common_args.py}}
> {code}
> from __future__ import division, print_function
> from copy import copy
> class CommonHiveParams(object):
>     def __init__(self, app_properties_hive):
>         super(CommonHiveParams, self).__init__()
>         # TODO: this should be part of a config object
>         self.app_properties_hive = app_properties_hive
>     def default_params(self):
>         return {
>             'HIVE_LIBS_BUCKET': self.app_properties_hive['S3_HIVE_LIB_BUCKET'],
>             'STAGE': '{{params.stage}}',
>         }
>     @staticmethod
>     def rundate_params():
>         return {
>             'YEAR': '{{execution_date.strftime("%Y")}}',
>             'MONTH': '{{execution_date.strftime("%m")}}',
>             'DAY': '{{execution_date.strftime("%d")}}',
>             'HOUR': '{{execution_date.strftime("%H")}}',
>             'MINUTE': '{{execution_date.strftime("%M")}}',
>         }
> def merge_dicts(*dicts):
>     """ Merge provided dicts without modification.
>     Duplicate keys are overwritten with values from the rightmost applicable dict.
>     """
>     if len(dicts) == 0:
>         return {}
>     result = copy(dicts[0])
>     for d in dicts[1:]:
>         result.update(d)
>     return result
> {code}
> {{operator_builders.py}}
> {code}
> """Functions for building operators from dict property definitions."""
> from __future__ import division, print_function
> from tn_airflow_components.operators.emr import EmrHiveOperator, create_emr_operator_with_step_sensor
> # TODO: this should not be a single package. Not every DAG needs EMR as a dependency, for example.
> def emr_hive_operator(task_id, dag, hive_step, **kwargs):
>     return create_emr_operator_with_step_sensor(task_id=task_id, dag=dag,
>                                                 main_operator_class=EmrHiveOperator, main_operator_kwargs=hive_step,
>                                                 **kwargs)
> def add_tasks(task_definitions, dag, operator_builders):
>     """Add tasks from dict definitions
>     :param task_definitions: dict of task definitions. Keys in the top-level dict are used as the task IDs
>     :type task_definitions: dict
>     :param dag: the DAG in which to define the tasks
>     :type dag: airflow.models.DAG
>     :param operator_builders: mapping of str 'operator_type' values to operator builder functions
>     :type operator_builders: dict
>     """
>     for task_id in task_definitions.keys():
>         task_definition = task_definitions[task_id]
>         operator_type = task_definition['operator_type']
>         operator = operator_builders[operator_type](task_id=task_id, dag=dag, **task_definition['operator_args'])
>         if task_definition['depends_on']:
>             for dependency in task_definition['depends_on']:
>                 operator.set_upstream(dag.get_task(dependency))
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)